http://git-wip-us.apache.org/repos/asf/ignite/blob/ec58b87c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs ---------------------------------------------------------------------- diff --cc modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs index 0000000,c65038c..17d7cc6 mode 000000,100644..100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderImpl.cs @@@ -1,0 -1,915 +1,918 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + namespace Apache.Ignite.Core.Impl.Portable + { + using System; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.IO; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Impl.Portable.Metadata; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Portable builder implementation. + /// </summary> + internal class PortableBuilderImpl : IPortableBuilder + { + /** Type IDs for metadata. */ + private static readonly IDictionary<Type, int> TypeIds; + + /** Cached dictionary with no values. */ + private static readonly IDictionary<int, object> EmptyVals = new Dictionary<int, object>(); + + /** Offset: length. */ + private const int OffsetLen = 10; + + /** Portables. */ + private readonly PortablesImpl _portables; + + /** */ + private readonly PortableBuilderImpl _parent; + + /** Initial portable object. */ + private readonly PortableUserObject _obj; + + /** Type descriptor. */ + private readonly IPortableTypeDescriptor _desc; + + /** Values. */ + private IDictionary<string, PortableBuilderField> _vals; + + /** Contextual fields. */ + private IDictionary<int, object> _cache; + + /** Hash code. */ + private int _hashCode; + + /** Current context. */ + private Context _ctx; + + /// <summary> + /// Static initializer. + /// </summary> + [SuppressMessage("Microsoft.Performance", "CA1810:InitializeReferenceTypeStaticFieldsInline")] + static PortableBuilderImpl() + { + TypeIds = new Dictionary<Type, int>(); + + // 1. Primitives. + TypeIds[typeof(byte)] = PortableUtils.TypeByte; + TypeIds[typeof(bool)] = PortableUtils.TypeBool; + TypeIds[typeof(short)] = PortableUtils.TypeShort; + TypeIds[typeof(char)] = PortableUtils.TypeChar; + TypeIds[typeof(int)] = PortableUtils.TypeInt; + TypeIds[typeof(long)] = PortableUtils.TypeLong; + TypeIds[typeof(float)] = PortableUtils.TypeFloat; + TypeIds[typeof(double)] = PortableUtils.TypeDouble; + TypeIds[typeof(decimal)] = PortableUtils.TypeDecimal; + + TypeIds[typeof(byte[])] = PortableUtils.TypeArrayByte; + TypeIds[typeof(bool[])] = PortableUtils.TypeArrayBool; + TypeIds[typeof(short[])] = PortableUtils.TypeArrayShort; + TypeIds[typeof(char[])] = PortableUtils.TypeArrayChar; + TypeIds[typeof(int[])] = PortableUtils.TypeArrayInt; + TypeIds[typeof(long[])] = PortableUtils.TypeArrayLong; + TypeIds[typeof(float[])] = PortableUtils.TypeArrayFloat; + TypeIds[typeof(double[])] = PortableUtils.TypeArrayDouble; + TypeIds[typeof(decimal?[])] = PortableUtils.TypeArrayDecimal; + + // 2. String. + TypeIds[typeof(string)] = PortableUtils.TypeString; + TypeIds[typeof(string[])] = PortableUtils.TypeArrayString; + + // 3. Guid. + TypeIds[typeof(Guid)] = PortableUtils.TypeGuid; + TypeIds[typeof(Guid?)] = PortableUtils.TypeGuid; + TypeIds[typeof(Guid[])] = PortableUtils.TypeArrayGuid; + TypeIds[typeof(Guid?[])] = PortableUtils.TypeArrayGuid; + + // 4. Date. + TypeIds[typeof(DateTime)] = PortableUtils.TypeDate; + TypeIds[typeof(DateTime?)] = PortableUtils.TypeDate; + TypeIds[typeof(DateTime[])] = PortableUtils.TypeArrayDate; + TypeIds[typeof(DateTime?[])] = PortableUtils.TypeArrayDate; + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="portables">Portables.</param> + /// <param name="parent">Parent builder.</param> + /// <param name="obj">Initial portable object.</param> + /// <param name="desc">Type descriptor.</param> + public PortableBuilderImpl(PortablesImpl portables, PortableBuilderImpl parent, + PortableUserObject obj, IPortableTypeDescriptor desc) + { + _portables = portables; + _parent = parent ?? this; + _obj = obj; + _desc = desc; + + _hashCode = obj.GetHashCode(); + } + + /** <inheritDoc /> */ + public IPortableBuilder SetHashCode(int hashCode) + { + _hashCode = hashCode; + + return this; + } + + /** <inheritDoc /> */ + public T GetField<T>(string name) + { + PortableBuilderField field; + + if (_vals != null && _vals.TryGetValue(name, out field)) + return field != PortableBuilderField.RmvMarker ? (T)field.Value : default(T); + T val = _obj.Field<T>(name, this); + + if (_vals == null) + _vals = new Dictionary<string, PortableBuilderField>(2); + + _vals[name] = new PortableBuilderField(typeof(T), val); + + return val; + } + + /** <inheritDoc /> */ + public IPortableBuilder SetField<T>(string name, T val) + { + return SetField0(name, new PortableBuilderField(typeof(T), val)); + } + + /** <inheritDoc /> */ + public IPortableBuilder RemoveField(string name) + { + return SetField0(name, PortableBuilderField.RmvMarker); + } + + /** <inheritDoc /> */ + public IPortableObject Build() + { + PortableHeapStream inStream = new PortableHeapStream(_obj.Data); + + inStream.Seek(_obj.Offset, SeekOrigin.Begin); + + // Assume that resulting length will be no less than header + [fields_cnt] * 12; + int len = PortableUtils.FullHdrLen + (_vals == null ? 0 : _vals.Count * 12); + + PortableHeapStream outStream = new PortableHeapStream(len); + + PortableWriterImpl writer = _portables.Marshaller.StartMarshal(outStream); + + writer.Builder(this); + + // All related builders will work in this context with this writer. + _parent._ctx = new Context(writer); + + try + { + // Write. + writer.Write(this); + + // Process metadata. + _portables.Marshaller.FinishMarshal(writer); + + // Create portable object once metadata is processed. + return new PortableUserObject(_portables.Marshaller, outStream.InternalArray, 0, + _desc.TypeId, _hashCode); + } + finally + { + // Cleanup. + _parent._ctx.Closed = true; + } + } + + /// <summary> + /// Create child builder. + /// </summary> + /// <param name="obj">Portable object.</param> + /// <returns>Child builder.</returns> + public PortableBuilderImpl Child(PortableUserObject obj) + { + var desc = _portables.Marshaller.GetDescriptor(true, obj.TypeId); + + return new PortableBuilderImpl(_portables, null, obj, desc); + } + + /// <summary> + /// Get cache field. + /// </summary> + /// <param name="pos">Position.</param> + /// <param name="val">Value.</param> + /// <returns><c>true</c> if value is found in cache.</returns> + public bool CachedField<T>(int pos, out T val) + { + if (_parent._cache != null) + { + object res; + + if (_parent._cache.TryGetValue(pos, out res)) + { + val = res != null ? (T)res : default(T); + + return true; + } + } + + val = default(T); + + return false; + } + + /// <summary> + /// Add field to cache test. + /// </summary> + /// <param name="pos">Position.</param> + /// <param name="val">Value.</param> + public void CacheField(int pos, object val) + { + if (_parent._cache == null) + _parent._cache = new Dictionary<int, object>(2); + + _parent._cache[pos] = val; + } + + /// <summary> + /// Internal set field routine. + /// </summary> + /// <param name="fieldName">Name.</param> + /// <param name="val">Value.</param> + /// <returns>This builder.</returns> + private IPortableBuilder SetField0(string fieldName, PortableBuilderField val) + { + if (_vals == null) + _vals = new Dictionary<string, PortableBuilderField>(); + + _vals[fieldName] = val; + + return this; + } + + /// <summary> + /// Mutate portable object. + /// </summary> + /// <param name="inStream">Input stream with initial object.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="desc">Portable type descriptor.</param> + /// <param name="hashCode">Hash code.</param> + /// <param name="vals">Values.</param> + internal void Mutate( + PortableHeapStream inStream, + PortableHeapStream outStream, + IPortableTypeDescriptor desc, + int hashCode, + IDictionary<string, PortableBuilderField> vals) + { + // Set correct builder to writer frame. + PortableBuilderImpl oldBuilder = _parent._ctx.Writer.Builder(_parent); + + int streamPos = inStream.Position; + + try + { + // Prepare fields. + IPortableMetadataHandler metaHnd = _portables.Marshaller.GetMetadataHandler(desc); + + IDictionary<int, object> vals0; + + if (vals == null || vals.Count == 0) + vals0 = EmptyVals; + else + { + vals0 = new Dictionary<int, object>(vals.Count); + + foreach (KeyValuePair<string, PortableBuilderField> valEntry in vals) + { + int fieldId = PortableUtils.FieldId(desc.TypeId, valEntry.Key, desc.NameConverter, desc.Mapper); + + if (vals0.ContainsKey(fieldId)) + throw new IgniteException("Collision in field ID detected (change field name or " + + "define custom ID mapper) [fieldName=" + valEntry.Key + ", fieldId=" + fieldId + ']'); + + vals0[fieldId] = valEntry.Value.Value; + + // Write metadata if: 1) it is enabled for type; 2) type is not null (i.e. it is neither + // remove marker, nor a field read through "GetField" method. + if (metaHnd != null && valEntry.Value.Type != null) + metaHnd.OnFieldWrite(fieldId, valEntry.Key, TypeId(valEntry.Value.Type)); + } + } + + // Actual processing. + Mutate0(_parent._ctx, inStream, outStream, true, hashCode, vals0); + + // 3. Handle metadata. + if (metaHnd != null) + { + IDictionary<string, int> meta = metaHnd.OnObjectWriteFinished(); + + if (meta != null) + _parent._ctx.Writer.SaveMetadata(desc.TypeId, desc.TypeName, desc.AffinityKeyFieldName, meta); + } + } + finally + { + // Restore builder frame. + _parent._ctx.Writer.Builder(oldBuilder); + + inStream.Seek(streamPos, SeekOrigin.Begin); + } + } + + /// <summary> + /// Internal mutation routine. + /// </summary> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="ctx">Context.</param> + /// <param name="changeHash">WHether hash should be changed.</param> + /// <param name="hash">New hash.</param> + /// <param name="vals">Values to be replaced.</param> + /// <returns>Mutated object.</returns> + private void Mutate0(Context ctx, PortableHeapStream inStream, IPortableStream outStream, + bool changeHash, int hash, IDictionary<int, object> vals) + { + int inStartPos = inStream.Position; + int outStartPos = outStream.Position; + + byte inHdr = inStream.ReadByte(); + + if (inHdr == PortableUtils.HdrNull) + outStream.WriteByte(PortableUtils.HdrNull); + else if (inHdr == PortableUtils.HdrHnd) + { + int inHnd = inStream.ReadInt(); + + int oldPos = inStartPos - inHnd; + int newPos; + + if (ctx.OldToNew(oldPos, out newPos)) + { + // Handle is still valid. + outStream.WriteByte(PortableUtils.HdrHnd); + outStream.WriteInt(outStartPos - newPos); + } + else + { + // Handle is invalid, write full object. + int inRetPos = inStream.Position; + + inStream.Seek(oldPos, SeekOrigin.Begin); + + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + + inStream.Seek(inRetPos, SeekOrigin.Begin); + } + } + else if (inHdr == PortableUtils.HdrFull) + { ++ PortableUtils.ValidateProtocolVersion(inStream); ++ + byte inUsrFlag = inStream.ReadByte(); + int inTypeId = inStream.ReadInt(); + int inHash = inStream.ReadInt(); + int inLen = inStream.ReadInt(); + int inRawOff = inStream.ReadInt(); + + int hndPos; + + if (ctx.AddOldToNew(inStartPos, outStartPos, out hndPos)) + { + // Object could be cached in parent builder. + object cachedVal; + + if (_parent._cache != null && _parent._cache.TryGetValue(inStartPos, out cachedVal)) { + ctx.Writer.Write(cachedVal); + } + else + { + // New object, write in full form. + outStream.WriteByte(PortableUtils.HdrFull); ++ outStream.WriteByte(PortableUtils.ProtoVer); + outStream.WriteByte(inUsrFlag); + outStream.WriteInt(inTypeId); + outStream.WriteInt(changeHash ? hash : inHash); + + // Skip length and raw offset as they are not known at this point. + outStream.Seek(8, SeekOrigin.Current); + + // Write regular fields. + while (inStream.Position < inStartPos + inRawOff) + { + int inFieldId = inStream.ReadInt(); + int inFieldLen = inStream.ReadInt(); + int inFieldDataPos = inStream.Position; + + object fieldVal; + + bool fieldFound = vals.TryGetValue(inFieldId, out fieldVal); + + if (!fieldFound || fieldVal != PortableBuilderField.RmvMarkerObj) + { + outStream.WriteInt(inFieldId); + + int fieldLenPos = outStream.Position; // Here we will write length later. + + outStream.Seek(4, SeekOrigin.Current); + + if (fieldFound) + { + // Replace field with new value. + if (fieldVal != PortableBuilderField.RmvMarkerObj) + ctx.Writer.Write(fieldVal); + + vals.Remove(inFieldId); + } + else + { + // If field was requested earlier, then we must write tracked value + if (_parent._cache != null && _parent._cache.TryGetValue(inFieldDataPos, out fieldVal)) + ctx.Writer.Write(fieldVal); + else + // Filed is not tracked, re-write as is. + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + } + + int fieldEndPos = outStream.Position; + + outStream.Seek(fieldLenPos, SeekOrigin.Begin); + outStream.WriteInt(fieldEndPos - fieldLenPos - 4); + outStream.Seek(fieldEndPos, SeekOrigin.Begin); + } + + // Position intput stream pointer after the field. + inStream.Seek(inFieldDataPos + inFieldLen, SeekOrigin.Begin); + } + + // Write remaining new fields. + foreach (KeyValuePair<int, object> valEntry in vals) + { + if (valEntry.Value != PortableBuilderField.RmvMarkerObj) + { + outStream.WriteInt(valEntry.Key); + + int fieldLenPos = outStream.Position; // Here we will write length later. + + outStream.Seek(4, SeekOrigin.Current); + + ctx.Writer.Write(valEntry.Value); + + int fieldEndPos = outStream.Position; + + outStream.Seek(fieldLenPos, SeekOrigin.Begin); + outStream.WriteInt(fieldEndPos - fieldLenPos - 4); + outStream.Seek(fieldEndPos, SeekOrigin.Begin); + } + } + + // Write raw data. + int rawPos = outStream.Position; + + outStream.Write(inStream.InternalArray, inStartPos + inRawOff, inLen - inRawOff); + + // Write length and raw data offset. + int outResPos = outStream.Position; + + outStream.Seek(outStartPos + OffsetLen, SeekOrigin.Begin); + + outStream.WriteInt(outResPos - outStartPos); // Length. + outStream.WriteInt(rawPos - outStartPos); // Raw offset. + + outStream.Seek(outResPos, SeekOrigin.Begin); + } + } + else + { + // Object has already been written, write as handle. + outStream.WriteByte(PortableUtils.HdrHnd); + outStream.WriteInt(outStartPos - hndPos); + } + + // Synchronize input stream position. + inStream.Seek(inStartPos + inLen, SeekOrigin.Begin); + } + else + { + // Try writing as well-known type with fixed size. + outStream.WriteByte(inHdr); + + if (!WriteAsPredefined(inHdr, inStream, outStream, ctx)) + throw new IgniteException("Unexpected header [position=" + (inStream.Position - 1) + + ", header=" + inHdr + ']'); + } + } + + /// <summary> + /// Process portable object inverting handles if needed. + /// </summary> + /// <param name="outStream">Output stream.</param> + /// <param name="port">Portable.</param> + internal void ProcessPortable(IPortableStream outStream, PortableUserObject port) + { + // Special case: writing portable object with correct inversions. + PortableHeapStream inStream = new PortableHeapStream(port.Data); + + inStream.Seek(port.Offset, SeekOrigin.Begin); + + // Use fresh context to ensure correct portable inversion. + Mutate0(new Context(), inStream, outStream, false, 0, EmptyVals); + } + + /// <summary> + /// Process child builder. + /// </summary> + /// <param name="outStream">Output stream.</param> + /// <param name="builder">Builder.</param> + internal void ProcessBuilder(IPortableStream outStream, PortableBuilderImpl builder) + { + PortableHeapStream inStream = new PortableHeapStream(builder._obj.Data); + + inStream.Seek(builder._obj.Offset, SeekOrigin.Begin); + + // Builder parent context might be null only in one case: if we never met this group of + // builders before. In this case we set context to their parent and track it. Context + // cleanup will be performed at the very end of build process. + if (builder._parent._ctx == null || builder._parent._ctx.Closed) + builder._parent._ctx = new Context(_parent._ctx); + + builder.Mutate(inStream, outStream as PortableHeapStream, builder._desc, + builder._hashCode, builder._vals); + } + + /// <summary> + /// Write object as a predefined type if possible. + /// </summary> + /// <param name="hdr">Header.</param> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="ctx">Context.</param> + /// <returns><c>True</c> if was written.</returns> + private bool WriteAsPredefined(byte hdr, PortableHeapStream inStream, IPortableStream outStream, + Context ctx) + { + switch (hdr) + { + case PortableUtils.TypeByte: + TransferBytes(inStream, outStream, 1); + + break; + + case PortableUtils.TypeShort: + TransferBytes(inStream, outStream, 2); + + break; + + case PortableUtils.TypeInt: + TransferBytes(inStream, outStream, 4); + + break; + + case PortableUtils.TypeLong: + TransferBytes(inStream, outStream, 8); + + break; + + case PortableUtils.TypeFloat: + TransferBytes(inStream, outStream, 4); + + break; + + case PortableUtils.TypeDouble: + TransferBytes(inStream, outStream, 8); + + break; + + case PortableUtils.TypeChar: + TransferBytes(inStream, outStream, 2); + + break; + + case PortableUtils.TypeBool: + TransferBytes(inStream, outStream, 1); + + break; + + case PortableUtils.TypeDecimal: + TransferBytes(inStream, outStream, 4); // Transfer scale + + int magLen = inStream.ReadInt(); // Transfer magnitude length. + + outStream.WriteInt(magLen); + + TransferBytes(inStream, outStream, magLen); // Transfer magnitude. + + break; + + case PortableUtils.TypeString: + PortableUtils.WriteString(PortableUtils.ReadString(inStream), outStream); + + break; + + case PortableUtils.TypeGuid: + TransferBytes(inStream, outStream, 16); + + break; + + case PortableUtils.TypeDate: + TransferBytes(inStream, outStream, 12); + + break; + + case PortableUtils.TypeArrayByte: + TransferArray(inStream, outStream, 1); + + break; + + case PortableUtils.TypeArrayShort: + TransferArray(inStream, outStream, 2); + + break; + + case PortableUtils.TypeArrayInt: + TransferArray(inStream, outStream, 4); + + break; + + case PortableUtils.TypeArrayLong: + TransferArray(inStream, outStream, 8); + + break; + + case PortableUtils.TypeArrayFloat: + TransferArray(inStream, outStream, 4); + + break; + + case PortableUtils.TypeArrayDouble: + TransferArray(inStream, outStream, 8); + + break; + + case PortableUtils.TypeArrayChar: + TransferArray(inStream, outStream, 2); + + break; + + case PortableUtils.TypeArrayBool: + TransferArray(inStream, outStream, 1); + + break; + + case PortableUtils.TypeArrayDecimal: + case PortableUtils.TypeArrayString: + case PortableUtils.TypeArrayGuid: + case PortableUtils.TypeArrayDate: + case PortableUtils.TypeArrayEnum: + case PortableUtils.TypeArray: + int arrLen = inStream.ReadInt(); + + outStream.WriteInt(arrLen); + + for (int i = 0; i < arrLen; i++) + Mutate0(ctx, inStream, outStream, false, 0, null); + + break; + + case PortableUtils.TypeCollection: + int colLen = inStream.ReadInt(); + + outStream.WriteInt(colLen); + + outStream.WriteByte(inStream.ReadByte()); + + for (int i = 0; i < colLen; i++) + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + + break; + + case PortableUtils.TypeDictionary: + int dictLen = inStream.ReadInt(); + + outStream.WriteInt(dictLen); + + outStream.WriteByte(inStream.ReadByte()); + + for (int i = 0; i < dictLen; i++) + { + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + } + + break; + + case PortableUtils.TypeMapEntry: + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + Mutate0(ctx, inStream, outStream, false, 0, EmptyVals); + + break; + + case PortableUtils.TypePortable: + TransferArray(inStream, outStream, 1); // Data array. + TransferBytes(inStream, outStream, 4); // Offset in array. + + break; + + case PortableUtils.TypeEnum: + TransferBytes(inStream, outStream, 4); // Integer ordinal. + + break; + + default: + return false; + } + + return true; + } + + /// <summary> + /// Get's metadata field type ID for the given type. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Type ID.</returns> + private static int TypeId(Type type) + { + int typeId; + + if (TypeIds.TryGetValue(type, out typeId)) + return typeId; + if (type.IsEnum) + return PortableUtils.TypeEnum; + if (type.IsArray) + return type.GetElementType().IsEnum ? PortableUtils.TypeArrayEnum : PortableUtils.TypeArray; + PortableCollectionInfo colInfo = PortableCollectionInfo.Info(type); + + return colInfo.IsAny ? colInfo.IsCollection || colInfo.IsGenericCollection ? + PortableUtils.TypeCollection : PortableUtils.TypeDictionary : PortableUtils.TypeObject; + } + + /// <summary> + /// Transfer bytes from one stream to another. + /// </summary> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="cnt">Bytes count.</param> + private static void TransferBytes(PortableHeapStream inStream, IPortableStream outStream, int cnt) + { + outStream.Write(inStream.InternalArray, inStream.Position, cnt); + + inStream.Seek(cnt, SeekOrigin.Current); + } + + /// <summary> + /// Transfer array of fixed-size elements from one stream to another. + /// </summary> + /// <param name="inStream">Input stream.</param> + /// <param name="outStream">Output stream.</param> + /// <param name="elemSize">Element size.</param> + private static void TransferArray(PortableHeapStream inStream, IPortableStream outStream, + int elemSize) + { + int len = inStream.ReadInt(); + + outStream.WriteInt(len); + + TransferBytes(inStream, outStream, elemSize * len); + } + + /// <summary> + /// Mutation ocntext. + /// </summary> + private class Context + { + /** Map from object position in old portable to position in new portable. */ + private IDictionary<int, int> _oldToNew; + + /** Parent context. */ + private readonly Context _parent; + + /** Portable writer. */ + private readonly PortableWriterImpl _writer; + + /** Children contexts. */ + private ICollection<Context> _children; + + /** Closed flag; if context is closed, it can no longer be used. */ + private bool _closed; + + /// <summary> + /// Constructor for parent context where writer invocation is not expected. + /// </summary> + public Context() + { + // No-op. + } + + /// <summary> + /// Constructor for parent context. + /// </summary> + /// <param name="writer">Writer</param> + public Context(PortableWriterImpl writer) + { + _writer = writer; + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="parent">Parent context.</param> + public Context(Context parent) + { + _parent = parent; + + _writer = parent._writer; + + if (parent._children == null) + parent._children = new List<Context>(); + + parent._children.Add(this); + } + + /// <summary> + /// Add another old-to-new position mapping. + /// </summary> + /// <param name="oldPos">Old position.</param> + /// <param name="newPos">New position.</param> + /// <param name="hndPos">Handle position.</param> + /// <returns><c>True</c> if ampping was added, <c>false</c> if mapping already existed and handle + /// position in the new object is returned.</returns> + public bool AddOldToNew(int oldPos, int newPos, out int hndPos) + { + if (_oldToNew == null) + _oldToNew = new Dictionary<int, int>(); + + if (_oldToNew.TryGetValue(oldPos, out hndPos)) + return false; + _oldToNew[oldPos] = newPos; + + return true; + } + + /// <summary> + /// Get mapping of old position to the new one. + /// </summary> + /// <param name="oldPos">Old position.</param> + /// <param name="newPos">New position.</param> + /// <returns><c>True</c> if mapping exists.</returns> + public bool OldToNew(int oldPos, out int newPos) + { + return _oldToNew.TryGetValue(oldPos, out newPos); + } + + /// <summary> + /// Writer. + /// </summary> + public PortableWriterImpl Writer + { + get { return _writer; } + } + + /// <summary> + /// Closed flag. + /// </summary> + public bool Closed + { + get + { + return _closed; + } + set + { + Context ctx = this; + + while (ctx != null) + { + ctx._closed = value; + + if (_children != null) { + foreach (Context child in _children) + child.Closed = value; + } + + ctx = ctx._parent; + } + } + } + } + } + }
http://git-wip-us.apache.org/repos/asf/ignite/blob/ec58b87c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs ---------------------------------------------------------------------- diff --cc modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs index 0000000,fe5f5c9..ff9aa34 mode 000000,100644..100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableReaderImpl.cs @@@ -1,0 -1,1017 +1,1020 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + namespace Apache.Ignite.Core.Impl.Portable + { + using System; + using System.Collections; + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using System.IO; + using System.Runtime.Serialization; + using Apache.Ignite.Core.Impl.Common; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Portable reader implementation. + /// </summary> + internal class PortableReaderImpl : IPortableReader, IPortableRawReader + { + /** Marshaller. */ + private readonly PortableMarshaller _marsh; + + /** Type descriptors. */ + private readonly IDictionary<long, IPortableTypeDescriptor> _descs; + + /** Parent builder. */ + private readonly PortableBuilderImpl _builder; + + /** Handles. */ + private PortableReaderHandleDictionary _hnds; + + /** Current type ID. */ + private int _curTypeId; + + /** Current position. */ + private int _curPos; + + /** Current raw data offset. */ + private int _curRawOffset; + + /** Current converter. */ + private IPortableNameMapper _curConverter; + + /** Current mapper. */ + private IPortableIdMapper _curMapper; + + /** Current raw flag. */ + private bool _curRaw; + + /** Detach flag. */ + private bool _detach; + + /** Portable read mode. */ + private PortableMode _mode; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="marsh">Marshaller.</param> + /// <param name="descs">Descriptors.</param> + /// <param name="stream">Input stream.</param> + /// <param name="mode">The mode.</param> + /// <param name="builder">Builder.</param> + public PortableReaderImpl + (PortableMarshaller marsh, + IDictionary<long, IPortableTypeDescriptor> descs, + IPortableStream stream, + PortableMode mode, + PortableBuilderImpl builder) + { + _marsh = marsh; + _descs = descs; + _mode = mode; + _builder = builder; + + Stream = stream; + } + + /// <summary> + /// Gets the marshaller. + /// </summary> + public PortableMarshaller Marshaller + { + get { return _marsh; } + } + + /** <inheritdoc /> */ + public IPortableRawReader GetRawReader() + { + MarkRaw(); + + return this; + } + + /** <inheritdoc /> */ + public bool ReadBoolean(string fieldName) + { + return ReadField(fieldName, r => r.ReadBoolean()); + } + + /** <inheritdoc /> */ + public bool ReadBoolean() + { + return Stream.ReadBool(); + } + + /** <inheritdoc /> */ + public bool[] ReadBooleanArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadBooleanArray); + } + + /** <inheritdoc /> */ + public bool[] ReadBooleanArray() + { + return Read(PortableUtils.ReadBooleanArray); + } + + /** <inheritdoc /> */ + public byte ReadByte(string fieldName) + { + return ReadField(fieldName, ReadByte); + } + + /** <inheritdoc /> */ + public byte ReadByte() + { + return Stream.ReadByte(); + } + + /** <inheritdoc /> */ + public byte[] ReadByteArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadByteArray); + } + + /** <inheritdoc /> */ + public byte[] ReadByteArray() + { + return Read(PortableUtils.ReadByteArray); + } + + /** <inheritdoc /> */ + public short ReadShort(string fieldName) + { + return ReadField(fieldName, ReadShort); + } + + /** <inheritdoc /> */ + public short ReadShort() + { + return Stream.ReadShort(); + } + + /** <inheritdoc /> */ + public short[] ReadShortArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadShortArray); + } + + /** <inheritdoc /> */ + public short[] ReadShortArray() + { + return Read(PortableUtils.ReadShortArray); + } + + /** <inheritdoc /> */ + public char ReadChar(string fieldName) + { + return ReadField(fieldName, ReadChar); + } + + /** <inheritdoc /> */ + public char ReadChar() + { + return Stream.ReadChar(); + } + + /** <inheritdoc /> */ + public char[] ReadCharArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadCharArray); + } + + /** <inheritdoc /> */ + public char[] ReadCharArray() + { + return Read(PortableUtils.ReadCharArray); + } + + /** <inheritdoc /> */ + public int ReadInt(string fieldName) + { + return ReadField(fieldName, ReadInt); + } + + /** <inheritdoc /> */ + public int ReadInt() + { + return Stream.ReadInt(); + } + + /** <inheritdoc /> */ + public int[] ReadIntArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadIntArray); + } + + /** <inheritdoc /> */ + public int[] ReadIntArray() + { + return Read(PortableUtils.ReadIntArray); + } + + /** <inheritdoc /> */ + public long ReadLong(string fieldName) + { + return ReadField(fieldName, ReadLong); + } + + /** <inheritdoc /> */ + public long ReadLong() + { + return Stream.ReadLong(); + } + + /** <inheritdoc /> */ + public long[] ReadLongArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadLongArray); + } + + /** <inheritdoc /> */ + public long[] ReadLongArray() + { + return Read(PortableUtils.ReadLongArray); + } + + /** <inheritdoc /> */ + public float ReadFloat(string fieldName) + { + return ReadField(fieldName, ReadFloat); + } + + /** <inheritdoc /> */ + public float ReadFloat() + { + return Stream.ReadFloat(); + } + + /** <inheritdoc /> */ + public float[] ReadFloatArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadFloatArray); + } + + /** <inheritdoc /> */ + public float[] ReadFloatArray() + { + return Read(PortableUtils.ReadFloatArray); + } + + /** <inheritdoc /> */ + public double ReadDouble(string fieldName) + { + return ReadField(fieldName, ReadDouble); + } + + /** <inheritdoc /> */ + public double ReadDouble() + { + return Stream.ReadDouble(); + } + + /** <inheritdoc /> */ + public double[] ReadDoubleArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadDoubleArray); + } + + /** <inheritdoc /> */ + public double[] ReadDoubleArray() + { + return Read(PortableUtils.ReadDoubleArray); + } + + /** <inheritdoc /> */ + public decimal? ReadDecimal(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadDecimal); + } + + /** <inheritdoc /> */ + public decimal? ReadDecimal() + { + return Read(PortableUtils.ReadDecimal); + } + + /** <inheritdoc /> */ + public decimal?[] ReadDecimalArray(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadDecimalArray); + } + + /** <inheritdoc /> */ + public decimal?[] ReadDecimalArray() + { + return Read(PortableUtils.ReadDecimalArray); + } + + /** <inheritdoc /> */ + public DateTime? ReadDate(string fieldName) + { + return ReadDate(fieldName, false); + } + + /** <inheritdoc /> */ + public DateTime? ReadDate(string fieldName, bool local) + { + return ReadField(fieldName, r => PortableUtils.ReadDate(r, local)); + } + + /** <inheritdoc /> */ + public DateTime? ReadDate() + { + return ReadDate(false); + } + + /** <inheritdoc /> */ + public DateTime? ReadDate(bool local) + { + return Read(r => PortableUtils.ReadDate(r, local)); + } + + /** <inheritdoc /> */ + public DateTime?[] ReadDateArray(string fieldName) + { + return ReadDateArray(fieldName, false); + } + + /** <inheritdoc /> */ + public DateTime?[] ReadDateArray(string fieldName, bool local) + { + return ReadField(fieldName, r => PortableUtils.ReadDateArray(r, local)); + } + + /** <inheritdoc /> */ + public DateTime?[] ReadDateArray() + { + return ReadDateArray(false); + } + + /** <inheritdoc /> */ + public DateTime?[] ReadDateArray(bool local) + { + return Read(r => PortableUtils.ReadDateArray(r, local)); + } + + /** <inheritdoc /> */ + public string ReadString(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadString); + } + + /** <inheritdoc /> */ + public string ReadString() + { + return Read(PortableUtils.ReadString); + } + + /** <inheritdoc /> */ + public string[] ReadStringArray(string fieldName) + { + return ReadField(fieldName, r => PortableUtils.ReadGenericArray<string>(r, false)); + } + + /** <inheritdoc /> */ + public string[] ReadStringArray() + { + return Read(r => PortableUtils.ReadGenericArray<string>(r, false)); + } + + /** <inheritdoc /> */ + public Guid? ReadGuid(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadGuid); + } + + /** <inheritdoc /> */ + public Guid? ReadGuid() + { + return Read(PortableUtils.ReadGuid); + } + + /** <inheritdoc /> */ + public Guid?[] ReadGuidArray(string fieldName) + { + return ReadField(fieldName, r => PortableUtils.ReadGenericArray<Guid?>(r, false)); + } + + /** <inheritdoc /> */ + public Guid?[] ReadGuidArray() + { + return Read(r => PortableUtils.ReadGenericArray<Guid?>(r, false)); + } + + /** <inheritdoc /> */ + public T ReadEnum<T>(string fieldName) + { + return ReadField(fieldName, PortableUtils.ReadEnum<T>); + } + + /** <inheritdoc /> */ + public T ReadEnum<T>() + { + return Read(PortableUtils.ReadEnum<T>); + } + + /** <inheritdoc /> */ + public T[] ReadEnumArray<T>(string fieldName) + { + return ReadField(fieldName, r => PortableUtils.ReadGenericArray<T>(r, true)); + } + + /** <inheritdoc /> */ + public T[] ReadEnumArray<T>() + { + return Read(r => PortableUtils.ReadGenericArray<T>(r, true)); + } + + /** <inheritdoc /> */ + public T ReadObject<T>(string fieldName) + { + if (_curRaw) + throw new PortableException("Cannot read named fields after raw data is read."); + + int fieldId = PortableUtils.FieldId(_curTypeId, fieldName, _curConverter, _curMapper); + + if (SeekField(fieldId)) + return Deserialize<T>(); + + return default(T); + } + + /** <inheritdoc /> */ + public T ReadObject<T>() + { + return Deserialize<T>(); + } + + /** <inheritdoc /> */ + public T[] ReadObjectArray<T>(string fieldName) + { + return ReadField(fieldName, r => PortableUtils.ReadGenericArray<T>(r, true)); + } + + /** <inheritdoc /> */ + public T[] ReadObjectArray<T>() + { + return Read(r => PortableUtils.ReadGenericArray<T>(r, true)); + } + + /** <inheritdoc /> */ + public ICollection ReadCollection(string fieldName) + { + return ReadCollection(fieldName, null, null); + } + + /** <inheritdoc /> */ + public ICollection ReadCollection() + { + return ReadCollection(null, null); + } + + /** <inheritdoc /> */ + public ICollection ReadCollection(string fieldName, PortableCollectionFactory factory, + PortableCollectionAdder adder) + { + return ReadField(fieldName, r => PortableUtils.ReadCollection(r, factory, adder)); + } + + /** <inheritdoc /> */ + public ICollection ReadCollection(PortableCollectionFactory factory, + PortableCollectionAdder adder) + { + return Read(r => PortableUtils.ReadCollection(r, factory, adder)); + } + + /** <inheritdoc /> */ + public ICollection<T> ReadGenericCollection<T>(string fieldName) + { + return ReadGenericCollection<T>(fieldName, null); + } + + /** <inheritdoc /> */ + public ICollection<T> ReadGenericCollection<T>() + { + return ReadGenericCollection((PortableGenericCollectionFactory<T>) null); + } + + /** <inheritdoc /> */ + public ICollection<T> ReadGenericCollection<T>(string fieldName, + PortableGenericCollectionFactory<T> factory) + { + return ReadField(fieldName, r => PortableUtils.ReadGenericCollection(r, factory)); + } + + /** <inheritdoc /> */ + public ICollection<T> ReadGenericCollection<T>(PortableGenericCollectionFactory<T> factory) + { + return Read(r => PortableUtils.ReadGenericCollection(r, factory)); + } + + /** <inheritdoc /> */ + public IDictionary ReadDictionary(string fieldName) + { + return ReadDictionary(fieldName, null); + } + + /** <inheritdoc /> */ + public IDictionary ReadDictionary() + { + return ReadDictionary((PortableDictionaryFactory)null); + } + + /** <inheritdoc /> */ + public IDictionary ReadDictionary(string fieldName, PortableDictionaryFactory factory) + { + return ReadField(fieldName, r => PortableUtils.ReadDictionary(r, factory)); + } + + /** <inheritdoc /> */ + public IDictionary ReadDictionary(PortableDictionaryFactory factory) + { + return Read(r => PortableUtils.ReadDictionary(r, factory)); + } + + /** <inheritdoc /> */ + public IDictionary<TK, TV> ReadGenericDictionary<TK, TV>(string fieldName) + { + return ReadGenericDictionary<TK, TV>(fieldName, null); + } + + /** <inheritdoc /> */ + public IDictionary<TK, TV> ReadGenericDictionary<TK, TV>() + { + return ReadGenericDictionary((PortableGenericDictionaryFactory<TK, TV>) null); + } + + /** <inheritdoc /> */ + public IDictionary<TK, TV> ReadGenericDictionary<TK, TV>(string fieldName, + PortableGenericDictionaryFactory<TK, TV> factory) + { + return ReadField(fieldName, r => PortableUtils.ReadGenericDictionary(r, factory)); + } + + /** <inheritdoc /> */ + public IDictionary<TK, TV> ReadGenericDictionary<TK, TV>(PortableGenericDictionaryFactory<TK, TV> factory) + { + return Read(r => PortableUtils.ReadGenericDictionary(r, factory)); + } + + /// <summary> + /// Enable detach mode for the next object read. + /// </summary> + public void DetachNext() + { + _detach = true; + } + + /// <summary> + /// Deserialize object. + /// </summary> + /// <returns>Deserialized object.</returns> + public T Deserialize<T>() + { + int pos = Stream.Position; + + byte hdr = Stream.ReadByte(); + + var doDetach = _detach; // save detach flag into a var and reset so it does not go deeper + + _detach = false; + + switch (hdr) + { + case PortableUtils.HdrNull: + if (default(T) != null) + throw new PortableException(string.Format("Invalid data on deserialization. " + + "Expected: '{0}' But was: null", typeof (T))); + + return default(T); + + case PortableUtils.HdrHnd: + return ReadHandleObject<T>(pos); + + case PortableUtils.HdrFull: + return ReadFullObject<T>(pos); + + case PortableUtils.TypePortable: + return ReadPortableObject<T>(doDetach); + } + + if (PortableUtils.IsPredefinedType(hdr)) + return PortableSystemHandlers.ReadSystemType<T>(hdr, this); + + throw new PortableException("Invalid header on deserialization [pos=" + pos + ", hdr=" + hdr + ']'); + } + + /// <summary> + /// Reads the portable object. + /// </summary> + private T ReadPortableObject<T>(bool doDetach) + { + var len = Stream.ReadInt(); + + var portablePos = Stream.Position; + + if (_mode != PortableMode.Deserialize) + return TypeCaster<T>.Cast(ReadAsPortable(portablePos, len, doDetach)); + + Stream.Seek(len, SeekOrigin.Current); + + var offset = Stream.ReadInt(); + + var retPos = Stream.Position; + + Stream.Seek(portablePos + offset, SeekOrigin.Begin); + + _mode = PortableMode.KeepPortable; + + try + { + return Deserialize<T>(); + } + finally + { + _mode = PortableMode.Deserialize; + + Stream.Seek(retPos, SeekOrigin.Begin); + } + } + + /// <summary> + /// Reads the portable object in portable form. + /// </summary> + private PortableUserObject ReadAsPortable(int dataPos, int dataLen, bool doDetach) + { + try + { + Stream.Seek(dataLen + dataPos, SeekOrigin.Begin); + + var offs = Stream.ReadInt(); // offset inside data + + var pos = dataPos + offs; + + if (!doDetach) + return GetPortableUserObject(pos, pos, Stream.Array()); + - Stream.Seek(pos + 10, SeekOrigin.Begin); ++ Stream.Seek(pos + PortableUtils.OffsetLen, SeekOrigin.Begin); + + var len = Stream.ReadInt(); + + Stream.Seek(pos, SeekOrigin.Begin); + + return GetPortableUserObject(pos, 0, Stream.ReadByteArray(len)); + } + finally + { + Stream.Seek(dataPos + dataLen + 4, SeekOrigin.Begin); + } + } + + /// <summary> + /// Reads the full object. + /// </summary> + [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "hashCode")] + private T ReadFullObject<T>(int pos) + { ++ // Validate protocol version. ++ PortableUtils.ValidateProtocolVersion(Stream); ++ + // Read header. + bool userType = Stream.ReadBool(); + int typeId = Stream.ReadInt(); + // ReSharper disable once UnusedVariable + int hashCode = Stream.ReadInt(); + int len = Stream.ReadInt(); + int rawOffset = Stream.ReadInt(); + + try + { + // Already read this object? + object hndObj; + + if (_hnds != null && _hnds.TryGetValue(pos, out hndObj)) + return (T) hndObj; + + if (userType && _mode == PortableMode.ForcePortable) + { + PortableUserObject portObj; + + if (_detach) + { + Stream.Seek(pos, SeekOrigin.Begin); + + portObj = GetPortableUserObject(pos, 0, Stream.ReadByteArray(len)); + } + else + portObj = GetPortableUserObject(pos, pos, Stream.Array()); + + T obj = _builder == null ? TypeCaster<T>.Cast(portObj) : TypeCaster<T>.Cast(_builder.Child(portObj)); + + AddHandle(pos, obj); + + return obj; + } + else + { + // Find descriptor. + IPortableTypeDescriptor desc; + + if (!_descs.TryGetValue(PortableUtils.TypeKey(userType, typeId), out desc)) + throw new PortableException("Unknown type ID: " + typeId); + + // Instantiate object. + if (desc.Type == null) + throw new PortableException("No matching type found for object [typeId=" + + desc.TypeId + ", typeName=" + desc.TypeName + ']'); + + // Preserve old frame. + int oldTypeId = _curTypeId; + int oldPos = _curPos; + int oldRawOffset = _curRawOffset; + IPortableNameMapper oldConverter = _curConverter; + IPortableIdMapper oldMapper = _curMapper; + bool oldRaw = _curRaw; + + // Set new frame. + _curTypeId = typeId; + _curPos = pos; + _curRawOffset = rawOffset; + _curConverter = desc.NameConverter; + _curMapper = desc.Mapper; + _curRaw = false; + + // Read object. + object obj; + + var sysSerializer = desc.Serializer as IPortableSystemTypeSerializer; + + if (sysSerializer != null) + obj = sysSerializer.ReadInstance(this); + else + { + try + { + obj = FormatterServices.GetUninitializedObject(desc.Type); + + // Save handle. + AddHandle(pos, obj); + } + catch (Exception e) + { + throw new PortableException("Failed to create type instance: " + + desc.Type.AssemblyQualifiedName, e); + } + + desc.Serializer.ReadPortable(obj, this); + } + + // Restore old frame. + _curTypeId = oldTypeId; + _curPos = oldPos; + _curRawOffset = oldRawOffset; + _curConverter = oldConverter; + _curMapper = oldMapper; + _curRaw = oldRaw; + + var wrappedSerializable = obj as SerializableObjectHolder; + + return wrappedSerializable != null ? (T) wrappedSerializable.Item : (T) obj; + } + } + finally + { + // Advance stream pointer. + Stream.Seek(pos + len, SeekOrigin.Begin); + } + } + + /// <summary> + /// Reads the handle object. + /// </summary> + private T ReadHandleObject<T>(int pos) + { + // Get handle position. + int hndPos = pos - Stream.ReadInt(); + + int retPos = Stream.Position; + + try + { + object hndObj; + + if (_builder == null || !_builder.CachedField(hndPos, out hndObj)) + { + if (_hnds == null || !_hnds.TryGetValue(hndPos, out hndObj)) + { + // No such handler, i.e. we trying to deserialize inner object before deserializing outer. + Stream.Seek(hndPos, SeekOrigin.Begin); + + hndObj = Deserialize<T>(); + } + + // Notify builder that we deserialized object on other location. + if (_builder != null) + _builder.CacheField(hndPos, hndObj); + } + + return (T) hndObj; + } + finally + { + // Position stream to correct place. + Stream.Seek(retPos, SeekOrigin.Begin); + } + } + + /// <summary> + /// Adds a handle to the dictionary. + /// </summary> + /// <param name="pos">Position.</param> + /// <param name="obj">Object.</param> + private void AddHandle(int pos, object obj) + { + if (_hnds == null) + _hnds = new PortableReaderHandleDictionary(pos, obj); + else + _hnds.Add(pos, obj); + } + + /// <summary> + /// Underlying stream. + /// </summary> + public IPortableStream Stream + { + get; + private set; + } + + /// <summary> + /// Mark current output as raw. + /// </summary> + private void MarkRaw() + { + if (!_curRaw) + { + _curRaw = true; + + Stream.Seek(_curPos + _curRawOffset, SeekOrigin.Begin); + } + } + + /// <summary> + /// Seek field with the given ID in the current object. + /// </summary> + /// <param name="fieldId">Field ID.</param> + /// <returns>True in case the field was found and position adjusted, false otherwise.</returns> + private bool SeekField(int fieldId) + { + // This method is expected to be called when stream pointer is set either before + // the field or on raw data offset. - int start = _curPos + 18; ++ int start = _curPos + PortableUtils.FullHdrLen; + int end = _curPos + _curRawOffset; + + int initial = Stream.Position; + + int cur = initial; + + while (cur < end) + { + int id = Stream.ReadInt(); + + if (fieldId == id) + { + // Field is found, return. + Stream.Seek(4, SeekOrigin.Current); + + return true; + } + + Stream.Seek(Stream.ReadInt(), SeekOrigin.Current); + + cur = Stream.Position; + } + + Stream.Seek(start, SeekOrigin.Begin); + + cur = start; + + while (cur < initial) + { + int id = Stream.ReadInt(); + + if (fieldId == id) + { + // Field is found, return. + Stream.Seek(4, SeekOrigin.Current); + + return true; + } + + Stream.Seek(Stream.ReadInt(), SeekOrigin.Current); + + cur = Stream.Position; + } + + return false; + } + + /// <summary> + /// Determines whether header at current position is HDR_NULL. + /// </summary> + private bool IsNullHeader() + { + var hdr = ReadByte(); + + return hdr != PortableUtils.HdrNull; + } + + /// <summary> + /// Seeks the field by name, reads header and returns true if field is present and header is not null. + /// </summary> + private bool SeekField(string fieldName) + { + if (_curRaw) + throw new PortableException("Cannot read named fields after raw data is read."); + + var fieldId = PortableUtils.FieldId(_curTypeId, fieldName, _curConverter, _curMapper); + + if (!SeekField(fieldId)) + return false; + + return IsNullHeader(); + } + + /// <summary> + /// Seeks specified field and invokes provided func. + /// </summary> + private T ReadField<T>(string fieldName, Func<IPortableStream, T> readFunc) + { + return SeekField(fieldName) ? readFunc(Stream) : default(T); + } + + /// <summary> + /// Seeks specified field and invokes provided func. + /// </summary> + private T ReadField<T>(string fieldName, Func<PortableReaderImpl, T> readFunc) + { + return SeekField(fieldName) ? readFunc(this) : default(T); + } + + /// <summary> + /// Seeks specified field and invokes provided func. + /// </summary> + private T ReadField<T>(string fieldName, Func<T> readFunc) + { + return SeekField(fieldName) ? readFunc() : default(T); + } + + /// <summary> + /// Reads header and invokes specified func if the header is not null. + /// </summary> + private T Read<T>(Func<PortableReaderImpl, T> readFunc) + { + return IsNullHeader() ? readFunc(this) : default(T); + } + + /// <summary> + /// Reads header and invokes specified func if the header is not null. + /// </summary> + private T Read<T>(Func<IPortableStream, T> readFunc) + { + return IsNullHeader() ? readFunc(Stream) : default(T); + } + + /// <summary> + /// Gets the portable user object from a byte array. + /// </summary> + /// <param name="pos">Position in the current stream.</param> + /// <param name="offs">Offset in the byte array.</param> + /// <param name="bytes">Bytes.</param> + private PortableUserObject GetPortableUserObject(int pos, int offs, byte[] bytes) + { - Stream.Seek(pos + 2, SeekOrigin.Begin); ++ Stream.Seek(pos + PortableUtils.OffsetTypeId, SeekOrigin.Begin); + + var id = Stream.ReadInt(); + + var hash = Stream.ReadInt(); + + return new PortableUserObject(_marsh, bytes, offs, id, hash); + } + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/ec58b87c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs ---------------------------------------------------------------------- diff --cc modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs index 0000000,891f261..2bf5ab8 mode 000000,100644..100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Portable/PortableUserObject.cs @@@ -1,0 -1,385 +1,385 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + namespace Apache.Ignite.Core.Impl.Portable + { + using System.Collections; + using System.Collections.Generic; + using System.IO; + using System.Runtime.CompilerServices; + using System.Text; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Impl.Portable.IO; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// User portable object. + /// </summary> + internal class PortableUserObject : IPortableObject + { + /** Marshaller. */ + private readonly PortableMarshaller _marsh; + + /** Raw data of this portable object. */ + private readonly byte[] _data; + + /** Offset in data array. */ + private readonly int _offset; + + /** Type ID. */ + private readonly int _typeId; + + /** Hash code. */ + private readonly int _hashCode; + + /** Fields. */ + private volatile IDictionary<int, int> _fields; + + /** Deserialized value. */ + private object _deserialized; + + /// <summary> + /// Initializes a new instance of the <see cref="PortableUserObject"/> class. + /// </summary> + /// <param name="marsh">Marshaller.</param> + /// <param name="data">Raw data of this portable object.</param> + /// <param name="offset">Offset in data array.</param> + /// <param name="typeId">Type ID.</param> + /// <param name="hashCode">Hash code.</param> + public PortableUserObject(PortableMarshaller marsh, byte[] data, int offset, int typeId, int hashCode) + { + _marsh = marsh; + + _data = data; + _offset = offset; + + _typeId = typeId; + _hashCode = hashCode; + } + + /** <inheritdoc /> */ + public int TypeId + { + get { return _typeId; } + } + + /** <inheritdoc /> */ + public T GetField<T>(string fieldName) + { + return Field<T>(fieldName, null); + } + + /** <inheritdoc /> */ + public T Deserialize<T>() + { + return Deserialize<T>(PortableMode.Deserialize); + } + + /// <summary> + /// Internal deserialization routine. + /// </summary> + /// <param name="mode">The mode.</param> + /// <returns> + /// Deserialized object. + /// </returns> + private T Deserialize<T>(PortableMode mode) + { + if (_deserialized == null) + { + IPortableStream stream = new PortableHeapStream(_data); + + stream.Seek(_offset, SeekOrigin.Begin); + + T res = _marsh.Unmarshal<T>(stream, mode); + + IPortableTypeDescriptor desc = _marsh.GetDescriptor(true, _typeId); + + if (!desc.KeepDeserialized) + return res; + + _deserialized = res; + } + + return (T)_deserialized; + } + + /** <inheritdoc /> */ + public IPortableMetadata GetMetadata() + { + return _marsh.GetMetadata(_typeId); + } + + /// <summary> + /// Raw data of this portable object. + /// </summary> + public byte[] Data + { + get { return _data; } + } + + /// <summary> + /// Offset in data array. + /// </summary> + public int Offset + { + get { return _offset; } + } + + /// <summary> + /// Get field with builder. + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="fieldName"></param> + /// <param name="builder"></param> + /// <returns></returns> + public T Field<T>(string fieldName, PortableBuilderImpl builder) + { + IPortableTypeDescriptor desc = _marsh.GetDescriptor(true, _typeId); + + InitializeFields(); + + int fieldId = PortableUtils.FieldId(_typeId, fieldName, desc.NameConverter, desc.Mapper); + + int pos; + + if (_fields.TryGetValue(fieldId, out pos)) + { + if (builder != null) + { + // Read in scope of build process. + T res; + + if (!builder.CachedField(pos, out res)) + { + res = Field0<T>(pos, builder); + + builder.CacheField(pos, res); + } + + return res; + } + return Field0<T>(pos, null); + } + return default(T); + } + + /// <summary> + /// Lazy fields initialization routine. + /// </summary> + private void InitializeFields() + { + if (_fields == null) + { + IPortableStream stream = new PortableHeapStream(_data); + - stream.Seek(_offset + 14, SeekOrigin.Begin); ++ stream.Seek(_offset + PortableUtils.OffsetRawOff, SeekOrigin.Begin); + + int rawDataOffset = stream.ReadInt(); + + _fields = PortableUtils.ObjectFields(stream, _typeId, rawDataOffset); + } + } + + /// <summary> + /// Gets field value on the given object. + /// </summary> + /// <param name="pos">Position.</param> + /// <param name="builder">Builder.</param> + /// <returns>Field value.</returns> + private T Field0<T>(int pos, PortableBuilderImpl builder) + { + IPortableStream stream = new PortableHeapStream(_data); + + stream.Seek(pos, SeekOrigin.Begin); + + return _marsh.Unmarshal<T>(stream, PortableMode.ForcePortable, builder); + } + + /** <inheritdoc /> */ + public override int GetHashCode() + { + return _hashCode; + } + + /** <inheritdoc /> */ + public override bool Equals(object obj) + { + if (this == obj) + return true; + + PortableUserObject that = obj as PortableUserObject; + + if (that != null) + { + if (_data == that._data && _offset == that._offset) + return true; + + // 1. Check hash code and type IDs. + if (_hashCode == that._hashCode && _typeId == that._typeId) + { + // 2. Check if objects have the same field sets. + InitializeFields(); + that.InitializeFields(); + + if (_fields.Keys.Count != that._fields.Keys.Count) + return false; + + foreach (int id in _fields.Keys) + { + if (!that._fields.Keys.Contains(id)) + return false; + } + + // 3. Check if objects have the same field values. + foreach (KeyValuePair<int, int> field in _fields) + { + object fieldVal = Field0<object>(field.Value, null); + object thatFieldVal = that.Field0<object>(that._fields[field.Key], null); + + if (!Equals(fieldVal, thatFieldVal)) + return false; + } + + // 4. Check if objects have the same raw data. + IPortableStream stream = new PortableHeapStream(_data); - stream.Seek(_offset + 10, SeekOrigin.Begin); ++ stream.Seek(_offset + PortableUtils.OffsetLen, SeekOrigin.Begin); + int len = stream.ReadInt(); + int rawOffset = stream.ReadInt(); + + IPortableStream thatStream = new PortableHeapStream(that._data); - thatStream.Seek(_offset + 10, SeekOrigin.Begin); ++ thatStream.Seek(_offset + PortableUtils.OffsetLen, SeekOrigin.Begin); + int thatLen = thatStream.ReadInt(); + int thatRawOffset = thatStream.ReadInt(); + + return PortableUtils.CompareArrays(_data, _offset + rawOffset, len - rawOffset, that._data, + that._offset + thatRawOffset, thatLen - thatRawOffset); + } + } + + return false; + } + + /** <inheritdoc /> */ + public override string ToString() + { + return ToString(new Dictionary<int, int>()); + } + + /// <summary> + /// ToString implementation. + /// </summary> + /// <param name="handled">Already handled objects.</param> + /// <returns>Object string.</returns> + private string ToString(IDictionary<int, int> handled) + { + int idHash; + + bool alreadyHandled = handled.TryGetValue(_offset, out idHash); + + if (!alreadyHandled) + idHash = RuntimeHelpers.GetHashCode(this); + + StringBuilder sb; + + IPortableTypeDescriptor desc = _marsh.GetDescriptor(true, _typeId); + + IPortableMetadata meta; + + try + { + meta = _marsh.GetMetadata(_typeId); + } + catch (IgniteException) + { + meta = null; + } + + if (meta == null) + sb = new StringBuilder("PortableObject [typeId=").Append(_typeId).Append(", idHash=" + idHash); + else + { + sb = new StringBuilder(meta.TypeName).Append(" [idHash=" + idHash); + + if (!alreadyHandled) + { + handled[_offset] = idHash; + + InitializeFields(); + + foreach (string fieldName in meta.Fields) + { + sb.Append(", "); + + int fieldId = PortableUtils.FieldId(_typeId, fieldName, desc.NameConverter, desc.Mapper); + + int fieldPos; + + if (_fields.TryGetValue(fieldId, out fieldPos)) + { + sb.Append(fieldName).Append('='); + + ToString0(sb, Field0<object>(fieldPos, null), handled); + } + } + } + else + sb.Append(", ..."); + } + + sb.Append(']'); + + return sb.ToString(); + } + + /// <summary> + /// Internal ToString routine with correct collections printout. + /// </summary> + /// <param name="sb">String builder.</param> + /// <param name="obj">Object to print.</param> + /// <param name="handled">Already handled objects.</param> + /// <returns>The same string builder.</returns> + private static void ToString0(StringBuilder sb, object obj, IDictionary<int, int> handled) + { + IEnumerable col = (obj is string) ? null : obj as IEnumerable; + + if (col == null) + { + PortableUserObject obj0 = obj as PortableUserObject; + + sb.Append(obj0 == null ? obj : obj0.ToString(handled)); + } + else + { + sb.Append('['); + + bool first = true; + + foreach (object elem in col) + { + if (first) + first = false; + else + sb.Append(", "); + + ToString0(sb, elem, handled); + } + + sb.Append(']'); + } + } + } + }
