http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs new file mode 100644 index 0000000..037ac85 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs @@ -0,0 +1,923 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + using System.Collections.Generic; + using System.IO; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Portable.Metadata; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Portable builder implementation. + /// </summary> + internal class PortableBuilderImpl : IPortableBuilder + { + /** Type IDs for metadata. */ + private static readonly IDictionary<Type, int> TypeIds; + + /** Cached dictionary with no values. */ + private static readonly IDictionary<int, object> EmptyVals = new Dictionary<int, object>(); + + /** Offset: length. */ + private const int OffsetLen = 10; + + /** Portables. */ + private readonly PortablesImpl _portables; + + /** */ + private readonly PortableBuilderImpl _parent; + + /** Initial portable object. */ + private readonly PortableUserObject _obj; + + /** Type descriptor. */ + private readonly IPortableTypeDescriptor _desc; + + /** Values. */ + private IDictionary<string, PortableBuilderField> _vals; + + /** Contextual fields. */ + private IDictionary<int, object> _cache; + + /** Hash code. */ + private int _hashCode; + + /** Current context. */ + private Context _ctx; + + /// <summary> + /// Static initializer. + /// </summary> + static PortableBuilderImpl() + { + TypeIds = new Dictionary<Type, int>(); + + // 1. Primitives. + TypeIds[typeof(byte)] = PortableUtils.TypeByte; + TypeIds[typeof(bool)] = PortableUtils.TypeBool; + TypeIds[typeof(short)] = PortableUtils.TypeShort; + TypeIds[typeof(char)] = PortableUtils.TypeChar; + TypeIds[typeof(int)] = PortableUtils.TypeInt; + TypeIds[typeof(long)] = PortableUtils.TypeLong; + TypeIds[typeof(float)] = PortableUtils.TypeFloat; + TypeIds[typeof(double)] = PortableUtils.TypeDouble; + TypeIds[typeof(decimal)] = PortableUtils.TypeDecimal; + + TypeIds[typeof(byte[])] = PortableUtils.TypeArrayByte; + TypeIds[typeof(bool[])] = PortableUtils.TypeArrayBool; + TypeIds[typeof(short[])] = PortableUtils.TypeArrayShort; + TypeIds[typeof(char[])] = PortableUtils.TypeArrayChar; + TypeIds[typeof(int[])] = PortableUtils.TypeArrayInt; + TypeIds[typeof(long[])] = PortableUtils.TypeArrayLong; + TypeIds[typeof(float[])] = PortableUtils.TypeArrayFloat; + TypeIds[typeof(double[])] = PortableUtils.TypeArrayDouble; + TypeIds[typeof(decimal[])] = PortableUtils.TypeArrayDecimal; + + // 2. String. + TypeIds[typeof(string)] = PortableUtils.TypeString; + TypeIds[typeof(string[])] = PortableUtils.TypeArrayString; + + // 3. Guid. + TypeIds[typeof(Guid)] = PortableUtils.TypeGuid; + TypeIds[typeof(Guid?)] = PortableUtils.TypeGuid; + TypeIds[typeof(Guid[])] = PortableUtils.TypeArrayGuid; + TypeIds[typeof(Guid?[])] = PortableUtils.TypeArrayGuid; + + // 4. Date. + TypeIds[typeof(DateTime)] = PortableUtils.TypeDate; + TypeIds[typeof(DateTime?)] = PortableUtils.TypeDate; + TypeIds[typeof(DateTime[])] = PortableUtils.TypeArrayDate; + TypeIds[typeof(DateTime?[])] = PortableUtils.TypeArrayDate; + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="portables">Portables.</param> + /// <param name="obj">Initial portable object.</param> + /// <param name="desc">Type descriptor.</param> + public PortableBuilderImpl(PortablesImpl portables, PortableUserObject obj, + IPortableTypeDescriptor desc) : this(portables, null, obj, desc) + { + // No-op. + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="portables">Portables.</param> + /// <param name="parent">Parent builder.</param> + /// <param name="obj">Initial portable object.</param> + /// <param name="desc">Type descriptor.</param> + public PortableBuilderImpl(PortablesImpl portables, PortableBuilderImpl parent, + PortableUserObject obj, IPortableTypeDescriptor desc) + { + _portables = portables; + _parent = parent ?? this; + _obj = obj; + _desc = desc; + + _hashCode = obj.GetHashCode(); + } + + /** <inheritDoc /> */ + public IPortableBuilder HashCode(int hashCode) + { + _hashCode = hashCode; + + return this; + } + + /** <inheritDoc /> */ + public T GetField<T>(string name) + { + PortableBuilderField field; + + if (_vals != null && _vals.TryGetValue(name, out field)) + return field != PortableBuilderField.RmvMarker ? (T)field.Value : default(T); + T val = _obj.Field<T>(name, this); + + if (_vals == null) + _vals = new Dictionary<string, PortableBuilderField>(2); + + _vals[name] = new PortableBuilderField(typeof(T), val); + + return val; + } + + /** <inheritDoc /> */ + public IPortableBuilder SetField<T>(string name, T val) + { + return SetField0(name, new PortableBuilderField(typeof(T), val)); + } + + /** <inheritDoc /> */ + public IPortableBuilder RemoveField(string name) + { + return SetField0(name, PortableBuilderField.RmvMarker); + } + + /** <inheritDoc /> */ + public IPortableObject Build() + { + PortableHeapStream inStream = new PortableHeapStream(_obj.Data); + + inStream.Seek(_obj.Offset, SeekOrigin.Begin); + + // Assume that resulting length will be no less than header + [fields_cnt] * 12; + int len = PortableUtils.FullHdrLen + (_vals == null ? 0 : _vals.Count * 12); + + PortableHeapStream outStream = new PortableHeapStream(len); + + PortableWriterImpl writer = _portables.Marshaller.StartMarshal(outStream); + + writer.Builder(this); + + // All related builders will work in this context with this writer. + _parent._ctx = new Context(writer); + + try + { + // Write. + writer.Write(this, null); + + // Process metadata. + _portables.Marshaller.FinishMarshal(writer); + + // Create portable object once metadata is processed. + return new PortableUserObject(_portables.Marshaller, outStream.InternalArray, 0, + _desc.TypeId, _hashCode); + } + finally + { + // Cleanup. + _parent._ctx.Closed = true; + } + } + + /// <summary> + /// Create child builder. + /// </summary> + /// <param name="obj">Portable object.</param> + /// <returns>Child builder.</returns> + public PortableBuilderImpl Child(PortableUserObject obj) + { + return _portables.ChildBuilder(_parent, obj); + } + + /// <summary> + /// Get cache field. + /// </summary> + /// <param name="pos">Position.</param> + /// <param name="val">Value.</param> + /// <returns><c>true</c> if value is found in cache.</returns> + public bool CachedField<T>(int pos, out T val) + { + if (_parent._cache != null) + { + object res; + + if (_parent._cache.TryGetValue(pos, out res)) + { + val = res != null ? (T)res : default(T); + + return true; + } + } + + val = default(T); + + return false; + } + + /// <summary> + /// Add field to cache test. + /// </summary> + /// <param name="pos">Position.</param> + /// <param name="val">Value.</param> + public void CacheField(int pos, object val) + { + if (_parent._cache == null) + _parent._cache = new Dictionary<int, object>(2); + + _parent._cache[pos] = val; + } + + /// <summary> + /// Internal set field routine. + /// </summary> + /// <param name="fieldName">Name.</param> + /// <param name="val">Value.</param> + /// <returns>This builder.</returns> + private IPortableBuilder SetField0(string fieldName, PortableBuilderField val) + { + if (_vals == null) + _vals = new Dictionary<string, PortableBuilderField>(); + + _vals[fieldName] = val; + + return this; + } + + /// <summary> + /// Mutate portable object. + /// </summary> + /// <param name="inStream">Input stream with initial object.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="desc">Portable type descriptor.</param> + /// <param name="hashCode">Hash code.</param> + /// <param name="vals">Values.</param> + internal void Mutate( + PortableHeapStream inStream, + PortableHeapStream outStream, + IPortableTypeDescriptor desc, + int hashCode, + IDictionary<string, PortableBuilderField> vals) + { + // Set correct builder to writer frame. + PortableBuilderImpl oldBuilder = _parent._ctx.Writer.Builder(_parent); + + int streamPos = inStream.Position; + + try + { + // Prepare fields. + IPortableMetadataHandler metaHnd = _portables.Marshaller.MetadataHandler(desc); + + IDictionary<int, object> vals0; + + if (vals == null || vals.Count == 0) + vals0 = EmptyVals; + else + { + vals0 = new Dictionary<int, object>(vals.Count); + + foreach (KeyValuePair<string, PortableBuilderField> valEntry in vals) + { + int fieldId = PortableUtils.FieldId(desc.TypeId, valEntry.Key, desc.NameConverter, desc.Mapper); + + if (vals0.ContainsKey(fieldId)) + throw new IgniteException("Collision in field ID detected (change field name or " + + "define custom ID mapper) [fieldName=" + valEntry.Key + ", fieldId=" + fieldId + ']'); + + vals0[fieldId] = valEntry.Value.Value; + + // Write metadata if: 1) it is enabled for type; 2) type is not null (i.e. it is neither + // remove marker, nor a field read through "GetField" method. + if (metaHnd != null && valEntry.Value.Type != null) + metaHnd.OnFieldWrite(fieldId, valEntry.Key, TypeId(valEntry.Value.Type)); + } + } + + // Actual processing. + Mutate0(_parent._ctx, inStream, outStream, true, hashCode, vals0); + + // 3. Handle metadata. + if (metaHnd != null) + { + IDictionary<string, int> meta = metaHnd.OnObjectWriteFinished(); + + if (meta != null) + _parent._ctx.Writer.SaveMetadata(desc.TypeId, desc.TypeName, desc.AffinityKeyFieldName, meta); + } + } + finally + { + // Restore builder frame. + _parent._ctx.Writer.Builder(oldBuilder); + + inStream.Seek(streamPos, SeekOrigin.Begin); + } + } + + /// <summary> + /// Internal mutation routine. + /// </summary> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="ctx">Context.</param> + /// <param name="changeHash">WHether hash should be changed.</param> + /// <param name="hash">New hash.</param> + /// <param name="vals">Values to be replaced.</param> + /// <returns>Mutated object.</returns> + private void Mutate0(Context ctx, PortableHeapStream inStream, IPortableStream outStream, + bool changeHash, int hash, IDictionary<int, object> vals) + { + int inStartPos = inStream.Position; + int outStartPos = outStream.Position; + + byte inHdr = inStream.ReadByte(); + + if (inHdr == PortableUtils.HdrNull) + outStream.WriteByte(PortableUtils.HdrNull); + else if (inHdr == PortableUtils.HdrHnd) + { + int inHnd = inStream.ReadInt(); + + int oldPos = inStartPos - inHnd; + int newPos; + + if (ctx.OldToNew(oldPos, out newPos)) + { + // Handle is still valid. + outStream.WriteByte(PortableUtils.HdrHnd); + outStream.WriteInt(outStartPos - newPos); + } + else + { + // Handle is invalid, write full object. + int inRetPos = inStream.Position; + + inStream.Seek(oldPos, SeekOrigin.Begin); + + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + + inStream.Seek(inRetPos, SeekOrigin.Begin); + } + } + else if (inHdr == PortableUtils.HdrFull) + { + byte inUsrFlag = inStream.ReadByte(); + int inTypeId = inStream.ReadInt(); + int inHash = inStream.ReadInt(); + int inLen = inStream.ReadInt(); + int inRawOff = inStream.ReadInt(); + + int hndPos; + + if (ctx.AddOldToNew(inStartPos, outStartPos, out hndPos)) + { + // Object could be cached in parent builder. + object cachedVal; + + if (_parent._cache != null && _parent._cache.TryGetValue(inStartPos, out cachedVal)) { + ctx.Writer.Write(cachedVal, null); + } + else + { + // New object, write in full form. + outStream.WriteByte(PortableUtils.HdrFull); + outStream.WriteByte(inUsrFlag); + outStream.WriteInt(inTypeId); + outStream.WriteInt(changeHash ? hash : inHash); + + // Skip length and raw offset as they are not known at this point. + outStream.Seek(8, SeekOrigin.Current); + + // Write regular fields. + while (inStream.Position < inStartPos + inRawOff) + { + int inFieldId = inStream.ReadInt(); + int inFieldLen = inStream.ReadInt(); + int inFieldDataPos = inStream.Position; + + object fieldVal; + + bool fieldFound = vals.TryGetValue(inFieldId, out fieldVal); + + if (!fieldFound || fieldVal != PortableBuilderField.RmvMarkerObj) + { + outStream.WriteInt(inFieldId); + + int fieldLenPos = outStream.Position; // Here we will write length later. + + outStream.Seek(4, SeekOrigin.Current); + + if (fieldFound) + { + // Replace field with new value. + if (fieldVal != PortableBuilderField.RmvMarkerObj) + ctx.Writer.Write(fieldVal, null); + + vals.Remove(inFieldId); + } + else + { + // If field was requested earlier, then we must write tracked value + if (_parent._cache != null && _parent._cache.TryGetValue(inFieldDataPos, out fieldVal)) + ctx.Writer.Write(fieldVal, null); + else + // Filed is not tracked, re-write as is. + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + } + + int fieldEndPos = outStream.Position; + + outStream.Seek(fieldLenPos, SeekOrigin.Begin); + outStream.WriteInt(fieldEndPos - fieldLenPos - 4); + outStream.Seek(fieldEndPos, SeekOrigin.Begin); + } + + // Position intput stream pointer after the field. + inStream.Seek(inFieldDataPos + inFieldLen, SeekOrigin.Begin); + } + + // Write remaining new fields. + foreach (KeyValuePair<int, object> valEntry in vals) + { + if (valEntry.Value != PortableBuilderField.RmvMarkerObj) + { + outStream.WriteInt(valEntry.Key); + + int fieldLenPos = outStream.Position; // Here we will write length later. + + outStream.Seek(4, SeekOrigin.Current); + + ctx.Writer.Write(valEntry.Value, null); + + int fieldEndPos = outStream.Position; + + outStream.Seek(fieldLenPos, SeekOrigin.Begin); + outStream.WriteInt(fieldEndPos - fieldLenPos - 4); + outStream.Seek(fieldEndPos, SeekOrigin.Begin); + } + } + + // Write raw data. + int rawPos = outStream.Position; + + outStream.Write(inStream.InternalArray, inStartPos + inRawOff, inLen - inRawOff); + + // Write length and raw data offset. + int outResPos = outStream.Position; + + outStream.Seek(outStartPos + OffsetLen, SeekOrigin.Begin); + + outStream.WriteInt(outResPos - outStartPos); // Length. + outStream.WriteInt(rawPos - outStartPos); // Raw offset. + + outStream.Seek(outResPos, SeekOrigin.Begin); + } + } + else + { + // Object has already been written, write as handle. + outStream.WriteByte(PortableUtils.HdrHnd); + outStream.WriteInt(outStartPos - hndPos); + } + + // Synchronize input stream position. + inStream.Seek(inStartPos + inLen, SeekOrigin.Begin); + } + else + { + // Try writing as well-known type with fixed size. + outStream.WriteByte(inHdr); + + if (!WriteAsPredefined(inHdr, inStream, outStream, ctx)) + throw new IgniteException("Unexpected header [position=" + (inStream.Position - 1) + + ", header=" + inHdr + ']'); + } + } + + /// <summary> + /// Process portable object inverting handles if needed. + /// </summary> + /// <param name="outStream">Output stream.</param> + /// <param name="port">Portable.</param> + internal void ProcessPortable(IPortableStream outStream, PortableUserObject port) + { + // Special case: writing portable object with correct inversions. + PortableHeapStream inStream = new PortableHeapStream(port.Data); + + inStream.Seek(port.Offset, SeekOrigin.Begin); + + // Use fresh context to ensure correct portable inversion. + Mutate0(new Context(), inStream, outStream, false, 0, EmptyVals); + } + + /// <summary> + /// Process child builder. + /// </summary> + /// <param name="outStream">Output stream.</param> + /// <param name="builder">Builder.</param> + internal void ProcessBuilder(IPortableStream outStream, PortableBuilderImpl builder) + { + PortableHeapStream inStream = new PortableHeapStream(builder._obj.Data); + + inStream.Seek(builder._obj.Offset, SeekOrigin.Begin); + + // Builder parent context might be null only in one case: if we never met this group of + // builders before. In this case we set context to their parent and track it. Context + // cleanup will be performed at the very end of build process. + if (builder._parent._ctx == null || builder._parent._ctx.Closed) + builder._parent._ctx = new Context(_parent._ctx); + + builder.Mutate(inStream, outStream as PortableHeapStream, builder._desc, + builder._hashCode, builder._vals); + } + + /// <summary> + /// Write object as a predefined type if possible. + /// </summary> + /// <param name="hdr">Header.</param> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="ctx">Context.</param> + /// <returns><c>True</c> if was written.</returns> + private bool WriteAsPredefined(byte hdr, PortableHeapStream inStream, IPortableStream outStream, + Context ctx) + { + switch (hdr) + { + case PortableUtils.TypeByte: + TransferBytes(inStream, outStream, 1); + + break; + + case PortableUtils.TypeShort: + TransferBytes(inStream, outStream, 2); + + break; + + case PortableUtils.TypeInt: + TransferBytes(inStream, outStream, 4); + + break; + + case PortableUtils.TypeLong: + TransferBytes(inStream, outStream, 8); + + break; + + case PortableUtils.TypeFloat: + TransferBytes(inStream, outStream, 4); + + break; + + case PortableUtils.TypeDouble: + TransferBytes(inStream, outStream, 8); + + break; + + case PortableUtils.TypeChar: + TransferBytes(inStream, outStream, 2); + + break; + + case PortableUtils.TypeBool: + TransferBytes(inStream, outStream, 1); + + break; + + case PortableUtils.TypeDecimal: + TransferBytes(inStream, outStream, 4); // Transfer scale + + int magLen = inStream.ReadInt(); // Transfer magnitude length. + + outStream.WriteInt(magLen); + + TransferBytes(inStream, outStream, magLen); // Transfer magnitude. + + break; + + case PortableUtils.TypeString: + PortableUtils.WriteString(PortableUtils.ReadString(inStream), outStream); + + break; + + case PortableUtils.TypeGuid: + TransferBytes(inStream, outStream, 16); + + break; + + case PortableUtils.TypeDate: + TransferBytes(inStream, outStream, 12); + + break; + + case PortableUtils.TypeArrayByte: + TransferArray(inStream, outStream, 1); + + break; + + case PortableUtils.TypeArrayShort: + TransferArray(inStream, outStream, 2); + + break; + + case PortableUtils.TypeArrayInt: + TransferArray(inStream, outStream, 4); + + break; + + case PortableUtils.TypeArrayLong: + TransferArray(inStream, outStream, 8); + + break; + + case PortableUtils.TypeArrayFloat: + TransferArray(inStream, outStream, 4); + + break; + + case PortableUtils.TypeArrayDouble: + TransferArray(inStream, outStream, 8); + + break; + + case PortableUtils.TypeArrayChar: + TransferArray(inStream, outStream, 2); + + break; + + case PortableUtils.TypeArrayBool: + TransferArray(inStream, outStream, 1); + + break; + + case PortableUtils.TypeArrayDecimal: + case PortableUtils.TypeArrayString: + case PortableUtils.TypeArrayGuid: + case PortableUtils.TypeArrayDate: + case PortableUtils.TypeArrayEnum: + case PortableUtils.TypeArray: + int arrLen = inStream.ReadInt(); + + outStream.WriteInt(arrLen); + + for (int i = 0; i < arrLen; i++) + Mutate0(ctx, inStream, outStream, false, 0, null); + + break; + + case PortableUtils.TypeCollection: + int colLen = inStream.ReadInt(); + + outStream.WriteInt(colLen); + + outStream.WriteByte(inStream.ReadByte()); + + for (int i = 0; i < colLen; i++) + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + + break; + + case PortableUtils.TypeDictionary: + int dictLen = inStream.ReadInt(); + + outStream.WriteInt(dictLen); + + outStream.WriteByte(inStream.ReadByte()); + + for (int i = 0; i < dictLen; i++) + { + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + } + + break; + + case PortableUtils.TypeMapEntry: + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + + break; + + case PortableUtils.TypePortable: + TransferArray(inStream, outStream, 1); // Data array. + TransferBytes(inStream, outStream, 4); // Offset in array. + + break; + + case PortableUtils.TypeEnum: + TransferBytes(inStream, outStream, 4); // Integer ordinal. + + break; + + default: + return false; + } + + return true; + } + + /// <summary> + /// Get's metadata field type ID for the given type. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Type ID.</returns> + private static int TypeId(Type type) + { + int typeId; + + if (TypeIds.TryGetValue(type, out typeId)) + return typeId; + if (type.IsEnum) + return PortableUtils.TypeEnum; + if (type.IsArray) + return type.GetElementType().IsEnum ? PortableUtils.TypeArrayEnum : PortableUtils.TypeArray; + PortableCollectionInfo colInfo = PortableCollectionInfo.Info(type); + + return colInfo.IsAny ? colInfo.IsCollection || colInfo.IsGenericCollection ? + PortableUtils.TypeCollection : PortableUtils.TypeDictionary : PortableUtils.TypeObject; + } + + /// <summary> + /// Transfer bytes from one stream to another. + /// </summary> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="cnt">Bytes count.</param> + private static void TransferBytes(PortableHeapStream inStream, IPortableStream outStream, int cnt) + { + outStream.Write(inStream.InternalArray, inStream.Position, cnt); + + inStream.Seek(cnt, SeekOrigin.Current); + } + + /// <summary> + /// Transfer array of fixed-size elements from one stream to another. + /// </summary> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="elemSize">Element size.</param> + private static void TransferArray(PortableHeapStream inStream, IPortableStream outStream, + int elemSize) + { + int len = inStream.ReadInt(); + + outStream.WriteInt(len); + + TransferBytes(inStream, outStream, elemSize * len); + } + + /// <summary> + /// Mutation ocntext. + /// </summary> + private class Context + { + /** Map from object position in old portable to position in new portable. */ + private IDictionary<int, int> _oldToNew; + + /** Parent context. */ + private readonly Context _parent; + + /** Portable writer. */ + private readonly PortableWriterImpl _writer; + + /** Children contexts. */ + private ICollection<Context> _children; + + /** Closed flag; if context is closed, it can no longer be used. */ + private bool _closed; + + /// <summary> + /// Constructor for parent context where writer invocation is not expected. + /// </summary> + public Context() + { + // No-op. + } + + /// <summary> + /// Constructor for parent context. + /// </summary> + /// <param name="writer">Writer</param> + public Context(PortableWriterImpl writer) + { + _writer = writer; + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="parent">Parent context.</param> + public Context(Context parent) + { + _parent = parent; + + _writer = parent._writer; + + if (parent._children == null) + parent._children = new List<Context>(); + + parent._children.Add(this); + } + + /// <summary> + /// Add another old-to-new position mapping. + /// </summary> + /// <param name="oldPos">Old position.</param> + /// <param name="newPos">New position.</param> + /// <param name="hndPos">Handle position.</param> + /// <returns><c>True</c> if ampping was added, <c>false</c> if mapping already existed and handle + /// position in the new object is returned.</returns> + public bool AddOldToNew(int oldPos, int newPos, out int hndPos) + { + if (_oldToNew == null) + _oldToNew = new Dictionary<int, int>(); + + if (_oldToNew.TryGetValue(oldPos, out hndPos)) + return false; + _oldToNew[oldPos] = newPos; + + return true; + } + + /// <summary> + /// Get mapping of old position to the new one. + /// </summary> + /// <param name="oldPos">Old position.</param> + /// <param name="newPos">New position.</param> + /// <returns><c>True</c> if mapping exists.</returns> + public bool OldToNew(int oldPos, out int newPos) + { + return _oldToNew.TryGetValue(oldPos, out newPos); + } + + /// <summary> + /// Writer. + /// </summary> + public PortableWriterImpl Writer + { + get { return _writer; } + } + + /// <summary> + /// Closed flag. + /// </summary> + public bool Closed + { + get + { + return _closed; + } + set + { + Context ctx = this; + + while (ctx != null) + { + ctx._closed = value; + + if (_children != null) { + foreach (Context child in _children) + child.Closed = value; + } + + ctx = ctx._parent; + } + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableCollectionInfo.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableCollectionInfo.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableCollectionInfo.cs new file mode 100644 index 0000000..fc61833 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableCollectionInfo.cs @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Diagnostics; + using System.Reflection; + using Apache.Ignite.Core.Impl.Common; + + /** + * <summary>Collection info helper.</summary> + */ + internal class PortableCollectionInfo + { + /** Flag: none. */ + private const byte FlagNone = 0; + + /** Flag: generic dictionary. */ + private const byte FlagGenericDictionary = 1; + + /** Flag: generic collection. */ + private const byte FlagGenericCollection = 2; + + /** Flag: dictionary. */ + private const byte FlagDictionary = 3; + + /** Flag: collection. */ + private const byte FlagCollection = 4; + + /** Cache "none" value. */ + private static readonly PortableCollectionInfo None = + new PortableCollectionInfo(FlagNone, null, null, null); + + /** Cache "dictionary" value. */ + private static readonly PortableCollectionInfo Dictionary = + new PortableCollectionInfo(FlagDictionary, PortableSystemHandlers.WriteHndDictionary, null, null); + + /** Cache "collection" value. */ + private static readonly PortableCollectionInfo Collection = + new PortableCollectionInfo(FlagCollection, PortableSystemHandlers.WriteHndCollection, null, null); + + /** Cached infos. */ + private static readonly IDictionary<Type, PortableCollectionInfo> Infos = + new ConcurrentDictionary<Type, PortableCollectionInfo>(64, 32); + + /** + * <summary>Get collection info for type.</summary> + * <param name="type">Type.</param> + * <returns>Collection info.</returns> + */ + public static PortableCollectionInfo Info(Type type) + { + PortableCollectionInfo info; + + if (!Infos.TryGetValue(type, out info)) + { + info = Info0(type); + + Infos[type] = info; + } + + return info; + } + + /** + * <summary>Internal routine to get collection info for type.</summary> + * <param name="type">Type.</param> + * <returns>Collection info.</returns> + */ + private static PortableCollectionInfo Info0(Type type) + { + if (type.IsGenericType) + { + if (type.GetGenericTypeDefinition() == PortableUtils.TypGenericDictionary) + { + MethodInfo writeMthd = + PortableUtils.MtdhWriteGenericDictionary.MakeGenericMethod(type.GetGenericArguments()); + MethodInfo readMthd = + PortableUtils.MtdhReadGenericDictionary.MakeGenericMethod(type.GetGenericArguments()); + + return new PortableCollectionInfo(FlagGenericDictionary, + PortableSystemHandlers.WriteHndGenericDictionary, writeMthd, readMthd); + } + + Type genTyp = type.GetInterface(PortableUtils.TypGenericDictionary.FullName); + + if (genTyp != null) + { + MethodInfo writeMthd = + PortableUtils.MtdhWriteGenericDictionary.MakeGenericMethod(genTyp.GetGenericArguments()); + MethodInfo readMthd = + PortableUtils.MtdhReadGenericDictionary.MakeGenericMethod(genTyp.GetGenericArguments()); + + return new PortableCollectionInfo(FlagGenericDictionary, + PortableSystemHandlers.WriteHndGenericDictionary, writeMthd, readMthd); + } + + if (type.GetGenericTypeDefinition() == PortableUtils.TypGenericCollection) + { + MethodInfo writeMthd = + PortableUtils.MtdhWriteGenericCollection.MakeGenericMethod(type.GetGenericArguments()); + MethodInfo readMthd = + PortableUtils.MtdhReadGenericCollection.MakeGenericMethod(type.GetGenericArguments()); + + return new PortableCollectionInfo(FlagGenericCollection, + PortableSystemHandlers.WriteHndGenericCollection, writeMthd, readMthd); + } + + genTyp = type.GetInterface(PortableUtils.TypGenericCollection.FullName); + + if (genTyp != null) + { + MethodInfo writeMthd = + PortableUtils.MtdhWriteGenericCollection.MakeGenericMethod(genTyp.GetGenericArguments()); + MethodInfo readMthd = + PortableUtils.MtdhReadGenericCollection.MakeGenericMethod(genTyp.GetGenericArguments()); + + return new PortableCollectionInfo(FlagGenericCollection, + PortableSystemHandlers.WriteHndGenericCollection, writeMthd, readMthd); + } + } + + if (type == PortableUtils.TypDictionary || type.GetInterface(PortableUtils.TypDictionary.FullName) != null) + return Dictionary; + if (type == PortableUtils.TypCollection || type.GetInterface(PortableUtils.TypCollection.FullName) != null) + return Collection; + return None; + } + + /** Flag. */ + private readonly byte _flag; + + /** Write handler. */ + private readonly PortableSystemWriteDelegate _writeHnd; + + /** Generic write func. */ + private readonly Action<object, PortableWriterImpl> _writeFunc; + + /** Generic read func. */ + private readonly Func<PortableReaderImpl, object, object> _readFunc; + + /** + * <summary>Constructor.</summary> + * <param name="flag0">Flag.</param> + * <param name="writeHnd0">Write handler.</param> + * <param name="writeMthd0">Generic write method.</param> + * <param name="readMthd0">Generic read method.</param> + */ + private PortableCollectionInfo(byte flag0, PortableSystemWriteDelegate writeHnd0, + MethodInfo writeMthd0, MethodInfo readMthd0) + { + _flag = flag0; + _writeHnd = writeHnd0; + + if (writeMthd0 != null) + _writeFunc = DelegateConverter.CompileFunc<Action<object, PortableWriterImpl>>(null, writeMthd0, null, + new[] {true, false, false}); + + if (readMthd0 != null) + _readFunc = DelegateConverter.CompileFunc<Func<PortableReaderImpl, object, object>>(null, readMthd0, + null, new[] {false, true, false}); + } + + /** + * <summary>Generic dictionary flag.</summary> + */ + public bool IsGenericDictionary + { + get { return _flag == FlagGenericDictionary; } + } + + /** + * <summary>Generic collection flag.</summary> + */ + public bool IsGenericCollection + { + get { return _flag == FlagGenericCollection; } + } + + /** + * <summary>Dictionary flag.</summary> + */ + public bool IsDictionary + { + get { return _flag == FlagDictionary; } + } + + /** + * <summary>Collection flag.</summary> + */ + public bool IsCollection + { + get { return _flag == FlagCollection; } + } + + /** + * <summary>Whether at least one flag is set..</summary> + */ + public bool IsAny + { + get { return _flag != FlagNone; } + } + + /** + * <summary>Write handler.</summary> + */ + public PortableSystemWriteDelegate WriteHandler + { + get { return _writeHnd; } + } + + /// <summary> + /// Reads the generic collection. + /// </summary> + public object ReadGeneric(PortableReaderImpl reader) + { + Debug.Assert(reader != null); + Debug.Assert(_readFunc != null); + + return _readFunc(reader, null); + } + + /// <summary> + /// Writes the generic collection. + /// </summary> + public void WriteGeneric(PortableWriterImpl writer, object value) + { + Debug.Assert(writer != null); + Debug.Assert(_writeFunc != null); + + _writeFunc(value, writer); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs new file mode 100644 index 0000000..f294cbd --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableFullTypeDescriptor.cs @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Full type descriptor. + /// </summary> + internal class PortableFullTypeDescriptor : IPortableTypeDescriptor + { + /** Type. */ + private readonly Type _type; + + /** Type ID. */ + private readonly int _typeId; + + /** Type name. */ + private readonly string _typeName; + + /** User type flag. */ + private readonly bool _userType; + + /** Name converter. */ + private readonly IPortableNameMapper _nameConverter; + + /** Mapper. */ + private readonly IPortableIdMapper _mapper; + + /** Serializer. */ + private readonly IPortableSerializer _serializer; + + /** Metadata enabled flag. */ + private readonly bool _metaEnabled; + + /** Whether to cache deserialized value in IPortableObject */ + private readonly bool _keepDeserialized; + + /** Affinity field key name. */ + private readonly string _affKeyFieldName; + + /** Typed handler. */ + private readonly object _typedHandler; + + /** Untyped handler. */ + private readonly PortableSystemWriteDelegate _untypedHandler; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="typeId">Type ID.</param> + /// <param name="typeName">Type name.</param> + /// <param name="userType">User type flag.</param> + /// <param name="nameConverter">Name converter.</param> + /// <param name="mapper">Mapper.</param> + /// <param name="serializer">Serializer.</param> + /// <param name="metaEnabled">Metadata enabled flag.</param> + /// <param name="keepDeserialized">Whether to cache deserialized value in IPortableObject</param> + /// <param name="affKeyFieldName">Affinity field key name.</param> + /// <param name="typedHandler">Typed handler.</param> + /// <param name="untypedHandler">Untyped handler.</param> + public PortableFullTypeDescriptor( + Type type, + int typeId, + string typeName, + bool userType, + IPortableNameMapper nameConverter, + IPortableIdMapper mapper, + IPortableSerializer serializer, + bool metaEnabled, + bool keepDeserialized, + string affKeyFieldName, + object typedHandler, + PortableSystemWriteDelegate untypedHandler) + { + _type = type; + _typeId = typeId; + _typeName = typeName; + _userType = userType; + _nameConverter = nameConverter; + _mapper = mapper; + _serializer = serializer; + _metaEnabled = metaEnabled; + _keepDeserialized = keepDeserialized; + _affKeyFieldName = affKeyFieldName; + _typedHandler = typedHandler; + _untypedHandler = untypedHandler; + } + + /// <summary> + /// Type. + /// </summary> + public Type Type + { + get { return _type; } + } + + /// <summary> + /// Type ID. + /// </summary> + public int TypeId + { + get { return _typeId; } + } + + /// <summary> + /// Type name. + /// </summary> + public string TypeName + { + get { return _typeName; } + } + + /// <summary> + /// User type flag. + /// </summary> + public bool UserType + { + get { return _userType; } + } + + /// <summary> + /// Metadata enabled flag. + /// </summary> + public bool MetadataEnabled + { + get { return _metaEnabled; } + } + + /// <summary> + /// Whether to cache deserialized value in IPortableObject + /// </summary> + public bool KeepDeserialized + { + get { return _keepDeserialized; } + } + + /// <summary> + /// Name converter. + /// </summary> + public IPortableNameMapper NameConverter + { + get { return _nameConverter; } + } + + /// <summary> + /// Mapper. + /// </summary> + public IPortableIdMapper Mapper + { + get { return _mapper; } + } + + /// <summary> + /// Serializer. + /// </summary> + public IPortableSerializer Serializer + { + get { return _serializer; } + } + + /// <summary> + /// Affinity key field name. + /// </summary> + public string AffinityKeyFieldName + { + get { return _affKeyFieldName; } + } + + /// <summary> + /// Typed handler. + /// </summary> + public object TypedHandler + { + get { return _typedHandler; } + } + + /// <summary> + /// Untyped handler. + /// </summary> + public PortableSystemWriteDelegate UntypedHandler + { + get { return _untypedHandler; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableHandleDictionary.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableHandleDictionary.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableHandleDictionary.cs new file mode 100644 index 0000000..32e1e02 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableHandleDictionary.cs @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System.Collections.Generic; + using System.Diagnostics; + using System.Diagnostics.CodeAnalysis; + + /// <summary> + /// Object handle dictionary. + /// </summary> + internal class PortableHandleDictionary<TK, TV> + { + /** Initial array sizes. */ + private const int InitialSize = 7; + + /** Dictionary. */ + private Dictionary<TK, TV> _dict; + + /** First key. */ + private readonly TK _key1; + + /** First value. */ + private readonly TV _val1; + + /** Second key. */ + private TK _key2; + + /** Second value. */ + private TV _val2; + + /** Third key. */ + private TK _key3; + + /** Third value. */ + private TV _val3; + + /// <summary> + /// Constructor with initial key-value pair. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="val">Value.</param> + [SuppressMessage("Microsoft.Usage", "CA2214:DoNotCallOverridableMethodsInConstructors"), + SuppressMessage("ReSharper", "DoNotCallOverridableMethodsInConstructor")] + public PortableHandleDictionary(TK key, TV val) + { + Debug.Assert(!Equals(key, EmptyKey)); + + _key1 = key; + _val1 = val; + + _key2 = EmptyKey; + _key3 = EmptyKey; + } + + /// <summary> + /// Add value to dictionary. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="val">Value.</param> + public void Add(TK key, TV val) + { + Debug.Assert(!Equals(key, EmptyKey)); + + if (Equals(_key2, EmptyKey)) + { + _key2 = key; + _val2 = val; + + return; + } + + if (Equals(_key3, EmptyKey)) + { + _key3 = key; + _val3 = val; + + return; + } + + if (_dict == null) + _dict = new Dictionary<TK, TV>(InitialSize); + + _dict[key] = val; + } + + /// <summary> + /// Try getting value for the given key. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="val">Value.</param> + /// <returns>True if key was found.</returns> + public bool TryGetValue(TK key, out TV val) + { + Debug.Assert(!Equals(key, EmptyKey)); + + if (Equals(key, _key1)) + { + val = _val1; + + return true; + } + + if (Equals(key, _key2)) + { + val = _val2; + + return true; + } + + if (Equals(key, _key3)) + { + val = _val3; + + return true; + } + + if (_dict == null) + { + val = default(TV); + + return false; + } + + return _dict.TryGetValue(key, out val); + } + + /// <summary> + /// Merge data from another dictionary without overwrite. + /// </summary> + /// <param name="that">Other dictionary.</param> + public void Merge(PortableHandleDictionary<TK, TV> that) + { + Debug.Assert(that != null, "that == null"); + + AddIfAbsent(that._key1, that._val1); + AddIfAbsent(that._key2, that._val2); + AddIfAbsent(that._key3, that._val3); + + if (that._dict == null) + return; + + foreach (var pair in that._dict) + AddIfAbsent(pair.Key, pair.Value); + } + + /// <summary> + /// Add key/value pair to the bucket if absent. + /// </summary> + /// <param name="key">Key.</param> + /// <param name="val">Value.</param> + private void AddIfAbsent(TK key, TV val) + { + if (Equals(key, EmptyKey)) + return; + + if (Equals(key, _key1) || Equals(key, _key2) || Equals(key, _key3)) + return; + + if (_dict == null || !_dict.ContainsKey(key)) + Add(key, val); + } + + /// <summary> + /// Gets the empty key. + /// </summary> + protected virtual TK EmptyKey + { + get { return default(TK); } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshalAwareSerializer.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshalAwareSerializer.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshalAwareSerializer.cs new file mode 100644 index 0000000..e3c7523 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshalAwareSerializer.cs @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Portable serializer which only supports <see cref="IPortableMarshalAware"/> types with a default ctor. + /// Does not use reflection. + /// </summary> + internal class PortableMarshalAwareSerializer : IPortableSerializer + { + /// <summary> + /// Default instance. + /// </summary> + public static readonly PortableMarshalAwareSerializer Instance = new PortableMarshalAwareSerializer(); + + /** <inheritdoc /> */ + public void WritePortable(object obj, IPortableWriter writer) + { + ((IPortableMarshalAware)obj).WritePortable(writer); + } + + /** <inheritdoc /> */ + public void ReadPortable(object obj, IPortableReader reader) + { + ((IPortableMarshalAware)obj).ReadPortable(reader); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs new file mode 100644 index 0000000..4b933a0 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableMarshaller.cs @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Impl.Cache; + using Apache.Ignite.Core.Impl.Cache.Query.Continuous; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Compute; + using Apache.Ignite.Core.Impl.Compute.Closure; + using Apache.Ignite.Core.Impl.Datastream; + using Apache.Ignite.Core.Impl.Interop; + using Apache.Ignite.Core.Impl.Messaging; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Portable.Metadata; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Portable marshaller implementation. + /// </summary> + internal class PortableMarshaller + { + /** Portable configuration. */ + private readonly PortableConfiguration _cfg; + + /** Type to descriptor map. */ + private readonly IDictionary<Type, IPortableTypeDescriptor> _typeToDesc = + new Dictionary<Type, IPortableTypeDescriptor>(); + + /** Type name to descriptor map. */ + private readonly IDictionary<string, IPortableTypeDescriptor> _typeNameToDesc = + new Dictionary<string, IPortableTypeDescriptor>(); + + /** ID to descriptor map. */ + private readonly IDictionary<long, IPortableTypeDescriptor> _idToDesc = + new Dictionary<long, IPortableTypeDescriptor>(); + + /** Cached metadatas. */ + private volatile IDictionary<int, PortableMetadataHolder> _metas = + new Dictionary<int, PortableMetadataHolder>(); + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cfg">Configurtaion.</param> + public PortableMarshaller(PortableConfiguration cfg) + { + // Validation. + if (cfg == null) + cfg = new PortableConfiguration(); + + if (cfg.TypeConfigurations == null) + cfg.TypeConfigurations = new List<PortableTypeConfiguration>(); + + foreach (PortableTypeConfiguration typeCfg in cfg.TypeConfigurations) + { + if (string.IsNullOrEmpty(typeCfg.TypeName)) + throw new PortableException("Type name cannot be null or empty: " + typeCfg); + + if (typeCfg.AssemblyName != null && typeCfg.AssemblyName.Length == 0) + throw new PortableException("Assembly name cannot be empty string: " + typeCfg); + } + + // Define predefined types. + AddPredefinedType(typeof(bool), PortableUtils.TypeBool, PortableSystemHandlers.WriteHndBoolTyped, PortableSystemHandlers.WriteHndBool); + AddPredefinedType(typeof(byte), PortableUtils.TypeByte, PortableSystemHandlers.WriteHndByteTyped, PortableSystemHandlers.WriteHndByte); + AddPredefinedType(typeof(short), PortableUtils.TypeShort, PortableSystemHandlers.WriteHndShortTyped, PortableSystemHandlers.WriteHndShort); + AddPredefinedType(typeof(char), PortableUtils.TypeChar, PortableSystemHandlers.WriteHndCharTyped, PortableSystemHandlers.WriteHndChar); + AddPredefinedType(typeof(int), PortableUtils.TypeInt, PortableSystemHandlers.WriteHndIntTyped, PortableSystemHandlers.WriteHndInt); + AddPredefinedType(typeof(long), PortableUtils.TypeLong, PortableSystemHandlers.WriteHndLongTyped, PortableSystemHandlers.WriteHndLong); + AddPredefinedType(typeof(float), PortableUtils.TypeFloat, PortableSystemHandlers.WriteHndFloatTyped, PortableSystemHandlers.WriteHndFloat); + AddPredefinedType(typeof(double), PortableUtils.TypeDouble, PortableSystemHandlers.WriteHndDoubleTyped, PortableSystemHandlers.WriteHndDouble); + AddPredefinedType(typeof(string), PortableUtils.TypeString, PortableSystemHandlers.WriteHndStringTyped, PortableSystemHandlers.WriteHndString); + AddPredefinedType(typeof(decimal), PortableUtils.TypeDecimal, PortableSystemHandlers.WriteHndDecimalTyped, PortableSystemHandlers.WriteHndDecimal); + AddPredefinedType(typeof(DateTime), PortableUtils.TypeDate, PortableSystemHandlers.WriteHndDateTyped, PortableSystemHandlers.WriteHndDate); + AddPredefinedType(typeof(Guid), PortableUtils.TypeGuid, PortableSystemHandlers.WriteHndGuidTyped, PortableSystemHandlers.WriteHndGuid); + + // TODO: Remove this registration + AddPredefinedType(typeof(PortableUserObject), PortableUtils.TypePortable, PortableSystemHandlers.WriteHndPortableTyped, + PortableSystemHandlers.WriteHndPortable); + + AddPredefinedType(typeof(bool[]), PortableUtils.TypeArrayBool, PortableSystemHandlers.WriteHndBoolArrayTyped, + PortableSystemHandlers.WriteHndBoolArray); + AddPredefinedType(typeof(byte[]), PortableUtils.TypeArrayByte, PortableSystemHandlers.WriteHndByteArrayTyped, + PortableSystemHandlers.WriteHndByteArray); + AddPredefinedType(typeof(short[]), PortableUtils.TypeArrayShort, PortableSystemHandlers.WriteHndShortArrayTyped, + PortableSystemHandlers.WriteHndShortArray); + AddPredefinedType(typeof(char[]), PortableUtils.TypeArrayChar, PortableSystemHandlers.WriteHndCharArrayTyped, + PortableSystemHandlers.WriteHndCharArray); + AddPredefinedType(typeof(int[]), PortableUtils.TypeArrayInt, PortableSystemHandlers.WriteHndIntArrayTyped, + PortableSystemHandlers.WriteHndIntArray); + AddPredefinedType(typeof(long[]), PortableUtils.TypeArrayLong, PortableSystemHandlers.WriteHndLongArrayTyped, + PortableSystemHandlers.WriteHndLongArray); + AddPredefinedType(typeof(float[]), PortableUtils.TypeArrayFloat, PortableSystemHandlers.WriteHndFloatArrayTyped, + PortableSystemHandlers.WriteHndFloatArray); + AddPredefinedType(typeof(double[]), PortableUtils.TypeArrayDouble, PortableSystemHandlers.WriteHndDoubleArrayTyped, + PortableSystemHandlers.WriteHndDoubleArray); + AddPredefinedType(typeof(decimal[]), PortableUtils.TypeArrayDecimal, PortableSystemHandlers.WriteHndDecimalArrayTyped, + PortableSystemHandlers.WriteHndDecimalArray); + AddPredefinedType(typeof(string[]), PortableUtils.TypeArrayString, PortableSystemHandlers.WriteHndStringArrayTyped, + PortableSystemHandlers.WriteHndStringArray); + AddPredefinedType(typeof(DateTime?[]), PortableUtils.TypeArrayDate, PortableSystemHandlers.WriteHndDateArrayTyped, + PortableSystemHandlers.WriteHndDateArray); + AddPredefinedType(typeof(Guid?[]), PortableUtils.TypeArrayGuid, PortableSystemHandlers.WriteHndGuidArrayTyped, + PortableSystemHandlers.WriteHndGuidArray); + + // Define system types. They use internal reflective stuff, so configuration doesn't affect them. + AddSystemTypes(); + + // 2. Define user types. + var dfltSerializer = cfg.DefaultSerializer == null ? new PortableReflectiveSerializer() : null; + + var typeResolver = new TypeResolver(); + + ICollection<PortableTypeConfiguration> typeCfgs = cfg.TypeConfigurations; + + if (typeCfgs != null) + foreach (PortableTypeConfiguration typeCfg in typeCfgs) + AddUserType(cfg, typeCfg, typeResolver, dfltSerializer); + + ICollection<string> types = cfg.Types; + + if (types != null) + foreach (string type in types) + AddUserType(cfg, new PortableTypeConfiguration(type), typeResolver, dfltSerializer); + + if (cfg.DefaultSerializer == null) + cfg.DefaultSerializer = dfltSerializer; + + _cfg = cfg; + } + + /// <summary> + /// Gets or sets the backing grid. + /// </summary> + public Ignite Ignite { get; set; } + + /// <summary> + /// Marshal object. + /// </summary> + /// <param name="val">Value.</param> + /// <returns>Serialized data as byte array.</returns> + public byte[] Marshal(object val) + { + PortableHeapStream stream = new PortableHeapStream(128); + + Marshal(val, stream); + + return stream.ArrayCopy(); + } + + /// <summary> + /// Marshal object. + /// </summary> + /// <param name="val">Value.</param> + /// <param name="stream">Output stream.</param> + /// <returns>Collection of metadatas (if any).</returns> + private void Marshal<T>(T val, IPortableStream stream) + { + PortableWriterImpl writer = StartMarshal(stream); + + writer.Write(val); + + FinishMarshal(writer); + } + + /// <summary> + /// Start marshal session. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Writer.</returns> + public PortableWriterImpl StartMarshal(IPortableStream stream) + { + return new PortableWriterImpl(this, stream); + } + + /// <summary> + /// Finish marshal session. + /// </summary> + /// <param name="writer">Writer.</param> + /// <returns>Dictionary with metadata.</returns> + public void FinishMarshal(IPortableWriter writer) + { + var meta = ((PortableWriterImpl) writer).Metadata(); + + var ignite = Ignite; + + if (ignite != null && meta != null && meta.Count > 0) + ignite.PutMetadata(meta); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="data">Data array.</param> + /// <param name="keepPortable">Whether to keep portables as portables.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(byte[] data, bool keepPortable) + { + return Unmarshal<T>(new PortableHeapStream(data), keepPortable); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="data">Data array.</param> + /// <param name="mode">The mode.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(byte[] data, PortableMode mode = PortableMode.Deserialize) + { + return Unmarshal<T>(new PortableHeapStream(data), mode); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="stream">Stream over underlying byte array with correct position.</param> + /// <param name="keepPortable">Whether to keep portables as portables.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(IPortableStream stream, bool keepPortable) + { + return Unmarshal<T>(stream, keepPortable ? PortableMode.KeepPortable : PortableMode.Deserialize, null); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="stream">Stream over underlying byte array with correct position.</param> + /// <param name="mode">The mode.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(IPortableStream stream, PortableMode mode = PortableMode.Deserialize) + { + return Unmarshal<T>(stream, mode, null); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="stream">Stream over underlying byte array with correct position.</param> + /// <param name="mode">The mode.</param> + /// <param name="builder">Builder.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(IPortableStream stream, PortableMode mode, PortableBuilderImpl builder) + { + return new PortableReaderImpl(this, _idToDesc, stream, mode, builder).Deserialize<T>(); + } + + /// <summary> + /// Start unmarshal session. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="keepPortable">Whether to keep portables as portables.</param> + /// <returns> + /// Reader. + /// </returns> + public PortableReaderImpl StartUnmarshal(IPortableStream stream, bool keepPortable) + { + return new PortableReaderImpl(this, _idToDesc, stream, + keepPortable ? PortableMode.KeepPortable : PortableMode.Deserialize, null); + } + + /// <summary> + /// Start unmarshal session. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="mode">The mode.</param> + /// <returns>Reader.</returns> + public PortableReaderImpl StartUnmarshal(IPortableStream stream, PortableMode mode = PortableMode.Deserialize) + { + return new PortableReaderImpl(this, _idToDesc, stream, mode, null); + } + + /// <summary> + /// Gets metadata for the given type ID. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <returns>Metadata or null.</returns> + public IPortableMetadata Metadata(int typeId) + { + if (Ignite != null) + { + IPortableMetadata meta = Ignite.Metadata(typeId); + + if (meta != null) + return meta; + } + + return PortableMetadataImpl.EmptyMeta; + } + + /// <summary> + /// Gets metadata handler for the given type ID. + /// </summary> + /// <param name="desc">Type descriptor.</param> + /// <returns>Metadata handler.</returns> + public IPortableMetadataHandler MetadataHandler(IPortableTypeDescriptor desc) + { + PortableMetadataHolder holder; + + if (!_metas.TryGetValue(desc.TypeId, out holder)) + { + lock (this) + { + if (!_metas.TryGetValue(desc.TypeId, out holder)) + { + IDictionary<int, PortableMetadataHolder> metas0 = + new Dictionary<int, PortableMetadataHolder>(_metas); + + holder = desc.MetadataEnabled ? new PortableMetadataHolder(desc.TypeId, + desc.TypeName, desc.AffinityKeyFieldName) : null; + + metas0[desc.TypeId] = holder; + + _metas = metas0; + } + } + } + + if (holder != null) + { + ICollection<int> ids = holder.FieldIds(); + + bool newType = ids.Count == 0 && !holder.Saved(); + + return new PortableHashsetMetadataHandler(ids, newType); + } + return null; + } + + /// <summary> + /// Callback invoked when metadata has been sent to the server and acknowledged by it. + /// </summary> + /// <param name="newMetas"></param> + public void OnMetadataSent(IDictionary<int, IPortableMetadata> newMetas) + { + foreach (KeyValuePair<int, IPortableMetadata> metaEntry in newMetas) + { + PortableMetadataImpl meta = (PortableMetadataImpl) metaEntry.Value; + + IDictionary<int, Tuple<string, int>> mergeInfo = + new Dictionary<int, Tuple<string, int>>(meta.FieldsMap().Count); + + foreach (KeyValuePair<string, int> fieldMeta in meta.FieldsMap()) + { + int fieldId = PortableUtils.FieldId(metaEntry.Key, fieldMeta.Key, null, null); + + mergeInfo[fieldId] = new Tuple<string, int>(fieldMeta.Key, fieldMeta.Value); + } + + _metas[metaEntry.Key].Merge(mergeInfo); + } + } + + /// <summary> + /// Gets descriptor for type. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Descriptor.</returns> + public IPortableTypeDescriptor Descriptor(Type type) + { + IPortableTypeDescriptor desc; + + _typeToDesc.TryGetValue(type, out desc); + + return desc; + } + + /// <summary> + /// Gets descriptor for type name. + /// </summary> + /// <param name="typeName">Type name.</param> + /// <returns>Descriptor.</returns> + public IPortableTypeDescriptor Descriptor(string typeName) + { + IPortableTypeDescriptor desc; + + return _typeNameToDesc.TryGetValue(typeName, out desc) ? desc : + new PortableSurrogateTypeDescriptor(_cfg, typeName); + } + + /// <summary> + /// + /// </summary> + /// <param name="userType"></param> + /// <param name="typeId"></param> + /// <returns></returns> + public IPortableTypeDescriptor Descriptor(bool userType, int typeId) + { + IPortableTypeDescriptor desc; + + return _idToDesc.TryGetValue(PortableUtils.TypeKey(userType, typeId), out desc) ? desc : + userType ? new PortableSurrogateTypeDescriptor(_cfg, typeId) : null; + } + + /// <summary> + /// Add user type. + /// </summary> + /// <param name="cfg">Configuration.</param> + /// <param name="typeCfg">Type configuration.</param> + /// <param name="typeResolver">The type resolver.</param> + /// <param name="dfltSerializer">The default serializer.</param> + private void AddUserType(PortableConfiguration cfg, PortableTypeConfiguration typeCfg, + TypeResolver typeResolver, IPortableSerializer dfltSerializer) + { + // Get converter/mapper/serializer. + IPortableNameMapper nameMapper = typeCfg.NameMapper ?? cfg.DefaultNameMapper; + + IPortableIdMapper idMapper = typeCfg.IdMapper ?? cfg.DefaultIdMapper; + + bool metaEnabled = typeCfg.MetadataEnabled ?? cfg.DefaultMetadataEnabled; + + bool keepDeserialized = typeCfg.KeepDeserialized ?? cfg.DefaultKeepDeserialized; + + // Try resolving type. + Type type = typeResolver.ResolveType(typeCfg.TypeName, typeCfg.AssemblyName); + + if (type != null) + { + // Type is found. + var typeName = GetTypeName(type); + + int typeId = PortableUtils.TypeId(typeName, nameMapper, idMapper); + + var serializer = typeCfg.Serializer ?? cfg.DefaultSerializer + ?? GetPortableMarshalAwareSerializer(type) ?? dfltSerializer; + + var refSerializer = serializer as PortableReflectiveSerializer; + + if (refSerializer != null) + refSerializer.Register(type, typeId, nameMapper, idMapper); + + AddType(type, typeId, typeName, true, metaEnabled, keepDeserialized, nameMapper, idMapper, serializer, + typeCfg.AffinityKeyFieldName, null, null); + } + else + { + // Type is not found. + string typeName = PortableUtils.SimpleTypeName(typeCfg.TypeName); + + int typeId = PortableUtils.TypeId(typeName, nameMapper, idMapper); + + AddType(null, typeId, typeName, true, metaEnabled, keepDeserialized, nameMapper, idMapper, null, + typeCfg.AffinityKeyFieldName, null, null); + } + } + + /// <summary> + /// Gets the <see cref="PortableMarshalAwareSerializer"/> for a type if it is compatible. + /// </summary> + /// <param name="type">The type.</param> + /// <returns>Resulting <see cref="PortableMarshalAwareSerializer"/>, or null.</returns> + private static IPortableSerializer GetPortableMarshalAwareSerializer(Type type) + { + return type.GetInterfaces().Contains(typeof (IPortableMarshalAware)) + ? PortableMarshalAwareSerializer.Instance + : null; + } + + /// <summary> + /// Add predefined type. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="typeId">Type ID.</param> + /// <param name="typedHandler">Typed handler.</param> + /// <param name="untypedHandler">Untyped handler.</param> + private void AddPredefinedType(Type type, int typeId, object typedHandler, + PortableSystemWriteDelegate untypedHandler) + { + AddType(type, typeId, GetTypeName(type), false, false, false, null, null, null, null, typedHandler, + untypedHandler); + } + + /// <summary> + /// Add type. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="typeId">Type ID.</param> + /// <param name="typeName">Type name.</param> + /// <param name="userType">User type flag.</param> + /// <param name="metaEnabled">Metadata enabled flag.</param> + /// <param name="keepDeserialized">Whether to cache deserialized value in IPortableObject</param> + /// <param name="nameMapper">Name mapper.</param> + /// <param name="idMapper">ID mapper.</param> + /// <param name="serializer">Serializer.</param> + /// <param name="affKeyFieldName">Affinity key field name.</param> + /// <param name="typedHandler">Typed handler.</param> + /// <param name="untypedHandler">Untyped handler.</param> + private void AddType(Type type, int typeId, string typeName, bool userType, bool metaEnabled, + bool keepDeserialized, IPortableNameMapper nameMapper, IPortableIdMapper idMapper, + IPortableSerializer serializer, string affKeyFieldName, object typedHandler, + PortableSystemWriteDelegate untypedHandler) + { + long typeKey = PortableUtils.TypeKey(userType, typeId); + + if (_idToDesc.ContainsKey(typeKey)) + { + string type1 = _idToDesc[typeKey].Type != null ? _idToDesc[typeKey].Type.AssemblyQualifiedName : null; + string type2 = type != null ? type.AssemblyQualifiedName : null; + + throw new PortableException("Conflicting type IDs [type1=" + type1 + ", type2=" + type2 + + ", typeId=" + typeId + ']'); + } + + if (userType && _typeNameToDesc.ContainsKey(typeName)) + throw new PortableException("Conflicting type name: " + typeName); + + IPortableTypeDescriptor descriptor = + new PortableFullTypeDescriptor(type, typeId, typeName, userType, nameMapper, idMapper, serializer, + metaEnabled, keepDeserialized, affKeyFieldName, typedHandler, untypedHandler); + + if (type != null) + _typeToDesc[type] = descriptor; + + if (userType) + _typeNameToDesc[typeName] = descriptor; + + _idToDesc[typeKey] = descriptor; + } + + /// <summary> + /// Adds a predefined system type. + /// </summary> + private void AddSystemType<T>(byte typeId, Func<PortableReaderImpl, T> ctor) where T : IPortableWriteAware + { + var type = typeof(T); + + var serializer = new PortableSystemTypeSerializer<T>(ctor); + + AddType(type, typeId, GetTypeName(type), false, false, false, null, null, serializer, null, null, null); + } + + /// <summary> + /// Adds predefined system types. + /// </summary> + private void AddSystemTypes() + { + AddSystemType(PortableUtils.TypeNativeJobHolder, w => new ComputeJobHolder(w)); + AddSystemType(PortableUtils.TypeComputeJobWrapper, w => new ComputeJobWrapper(w)); + AddSystemType(PortableUtils.TypePortableJobResHolder, w => new PortableResultWrapper(w)); + AddSystemType(PortableUtils.TypeDotNetCfg, w => new InteropDotNetConfiguration(w)); + AddSystemType(PortableUtils.TypeDotNetPortableCfg, w => new InteropDotNetPortableConfiguration(w)); + AddSystemType(PortableUtils.TypeDotNetPortableTypCfg, w => new InteropDotNetPortableTypeConfiguration(w)); + AddSystemType(PortableUtils.TypeIgniteProxy, w => new IgniteProxy()); + AddSystemType(PortableUtils.TypeComputeOutFuncJob, w => new ComputeOutFuncJob(w)); + AddSystemType(PortableUtils.TypeComputeOutFuncWrapper, w => new ComputeOutFuncWrapper(w)); + AddSystemType(PortableUtils.TypeComputeFuncWrapper, w => new ComputeFuncWrapper(w)); + AddSystemType(PortableUtils.TypeComputeFuncJob, w => new ComputeFuncJob(w)); + AddSystemType(PortableUtils.TypeComputeActionJob, w => new ComputeActionJob(w)); + AddSystemType(PortableUtils.TypeContinuousQueryRemoteFilterHolder, w => new ContinuousQueryFilterHolder(w)); + AddSystemType(PortableUtils.TypeSerializableHolder, w => new SerializableObjectHolder(w)); + AddSystemType(PortableUtils.TypeCacheEntryProcessorHolder, w => new CacheEntryProcessorHolder(w)); + AddSystemType(PortableUtils.TypeCacheEntryPredicateHolder, w => new CacheEntryFilterHolder(w)); + AddSystemType(PortableUtils.TypeMessageFilterHolder, w => new MessageFilterHolder(w)); + AddSystemType(PortableUtils.TypePortableOrSerializableHolder, w => new PortableOrSerializableObjectHolder(w)); + AddSystemType(PortableUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w)); + } + + /// <summary> + /// Gets the name of the type. + /// </summary> + /// <param name="type">The type.</param> + /// <returns> + /// Simple type name for non-generic types; simple type name with appended generic arguments for generic types. + /// </returns> + private static string GetTypeName(Type type) + { + if (!type.IsGenericType) + return type.Name; + + var args = type.GetGenericArguments().Select(GetTypeName).Aggregate((x, y) => x + "," + y); + + return string.Format("{0}[{1}]", type.Name, args); + } + } +}
