http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs new file mode 100644 index 0000000..38a19ab --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableAbstractStream.cs @@ -0,0 +1,1299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable.IO +{ + using System; + using System.Diagnostics.CodeAnalysis; + using System.IO; + using System.Reflection; + using System.Text; + + /// <summary> + /// Base class for managed and unmanaged data streams. + /// </summary> + [CLSCompliant(false)] + internal unsafe abstract class PortableAbstractStream : IPortableStream + { + /// <summary> + /// Array copy delegate. + /// </summary> + delegate void MemCopy(byte* a1, byte* a2, int len); + + /** memcpy function handle. */ + private static readonly MemCopy Memcpy; + + /** Whether src and dest arguments are inverted. */ + private static readonly bool MemcpyInverted; + + /** Byte: zero. */ + protected const byte ByteZero = 0; + + /** Byte: one. */ + protected const byte ByteOne = 1; + + /** LITTLE_ENDIAN flag. */ + protected static readonly bool LittleEndian = BitConverter.IsLittleEndian; + + /** Position. */ + protected int Pos; + + /** Disposed flag. */ + private bool _disposed; + + /// <summary> + /// Static initializer. + /// </summary> + [SuppressMessage("Microsoft.Design", "CA1065:DoNotRaiseExceptionsInUnexpectedLocations")] + static PortableAbstractStream() + { + Type type = typeof(Buffer); + + const BindingFlags flags = BindingFlags.Static | BindingFlags.NonPublic; + Type[] paramTypes = { typeof(byte*), typeof(byte*), typeof(int) }; + + // Assume .Net 4.5. + MethodInfo mthd = type.GetMethod("Memcpy", flags, null, paramTypes, null); + + MemcpyInverted = true; + + if (mthd == null) + { + // Assume .Net 4.0. + mthd = type.GetMethod("memcpyimpl", flags, null, paramTypes, null); + + MemcpyInverted = false; + + if (mthd == null) + throw new InvalidOperationException("Unable to get memory copy function delegate."); + } + + Memcpy = (MemCopy)Delegate.CreateDelegate(typeof(MemCopy), mthd); + } + + /// <summary> + /// Write byte. + /// </summary> + /// <param name="val">Byte value.</param> + public abstract void WriteByte(byte val); + + /// <summary> + /// Read byte. + /// </summary> + /// <returns> + /// Byte value. + /// </returns> + public abstract byte ReadByte(); + + /// <summary> + /// Write byte array. + /// </summary> + /// <param name="val">Byte array.</param> + public abstract void WriteByteArray(byte[] val); + + /// <summary> + /// Internal routine to write byte array. + /// </summary> + /// <param name="val">Byte array.</param> + /// <param name="data">Data pointer.</param> + protected void WriteByteArray0(byte[] val, byte* data) + { + fixed (byte* val0 = val) + { + CopyMemory(val0, data, val.Length); + } + } + + /// <summary> + /// Read byte array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Byte array. + /// </returns> + public abstract byte[] ReadByteArray(int cnt); + + /// <summary> + /// Internal routine to read byte array. + /// </summary> + /// <param name="len">Array length.</param> + /// <param name="data">Data pointer.</param> + /// <returns>Byte array</returns> + protected byte[] ReadByteArray0(int len, byte* data) + { + byte[] res = new byte[len]; + + fixed (byte* res0 = res) + { + CopyMemory(data, res0, len); + } + + return res; + } + + /// <summary> + /// Write bool. + /// </summary> + /// <param name="val">Bool value.</param> + public void WriteBool(bool val) + { + WriteByte(val ? ByteOne : ByteZero); + } + + /// <summary> + /// Read bool. + /// </summary> + /// <returns> + /// Bool value. + /// </returns> + public bool ReadBool() + { + return ReadByte() == ByteOne; + } + + /// <summary> + /// Write bool array. + /// </summary> + /// <param name="val">Bool array.</param> + public abstract void WriteBoolArray(bool[] val); + + /// <summary> + /// Internal routine to write bool array. + /// </summary> + /// <param name="val">Bool array.</param> + /// <param name="data">Data pointer.</param> + protected void WriteBoolArray0(bool[] val, byte* data) + { + fixed (bool* val0 = val) + { + CopyMemory((byte*)val0, data, val.Length); + } + } + + /// <summary> + /// Read bool array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Bool array. + /// </returns> + public abstract bool[] ReadBoolArray(int cnt); + + /// <summary> + /// Internal routine to read bool array. + /// </summary> + /// <param name="len">Array length.</param> + /// <param name="data">Data pointer.</param> + /// <returns>Bool array</returns> + protected bool[] ReadBoolArray0(int len, byte* data) + { + bool[] res = new bool[len]; + + fixed (bool* res0 = res) + { + CopyMemory(data, (byte*)res0, len); + } + + return res; + } + + /// <summary> + /// Write short. + /// </summary> + /// <param name="val">Short value.</param> + public abstract void WriteShort(short val); + + /// <summary> + /// Internal routine to write short value. + /// </summary> + /// <param name="val">Short value.</param> + /// <param name="data">Data pointer.</param> + protected void WriteShort0(short val, byte* data) + { + if (LittleEndian) + *((short*)data) = val; + else + { + byte* valPtr = (byte*)&val; + + data[0] = valPtr[1]; + data[1] = valPtr[0]; + } + } + + /// <summary> + /// Read short. + /// </summary> + /// <returns> + /// Short value. + /// </returns> + public abstract short ReadShort(); + + /// <summary> + /// Internal routine to read short value. + /// </summary> + /// <param name="data">Data pointer.</param> + /// <returns>Short value</returns> + protected short ReadShort0(byte* data) + { + short val; + + if (LittleEndian) + val = *((short*)data); + else + { + byte* valPtr = (byte*)&val; + + valPtr[0] = data[1]; + valPtr[1] = data[0]; + } + + return val; + } + + /// <summary> + /// Write short array. + /// </summary> + /// <param name="val">Short array.</param> + public abstract void WriteShortArray(short[] val); + + /// <summary> + /// Internal routine to write short array. + /// </summary> + /// <param name="val">Short array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected void WriteShortArray0(short[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (short* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + short val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read short array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Short array. + /// </returns> + public abstract short[] ReadShortArray(int cnt); + + /// <summary> + /// Internal routine to read short array. + /// </summary> + /// <param name="len">Array length.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Short array</returns> + protected short[] ReadShortArray0(int len, byte* data, int cnt) + { + short[] res = new short[len]; + + if (LittleEndian) + { + fixed (short* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + short val; + + byte* valPtr = (byte*)&val; + + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write char. + /// </summary> + /// <param name="val">Char value.</param> + public void WriteChar(char val) + { + WriteShort(*(short*)(&val)); + } + + /// <summary> + /// Read char. + /// </summary> + /// <returns> + /// Char value. + /// </returns> + public char ReadChar() + { + short val = ReadShort(); + + return *(char*)(&val); + } + + /// <summary> + /// Write char array. + /// </summary> + /// <param name="val">Char array.</param> + public abstract void WriteCharArray(char[] val); + + /// <summary> + /// Internal routine to write char array. + /// </summary> + /// <param name="val">Char array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected void WriteCharArray0(char[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (char* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + char val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read char array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Char array. + /// </returns> + public abstract char[] ReadCharArray(int cnt); + + /// <summary> + /// Internal routine to read char array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Char array</returns> + protected char[] ReadCharArray0(int len, byte* data, int cnt) + { + char[] res = new char[len]; + + if (LittleEndian) + { + fixed (char* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + char val; + + byte* valPtr = (byte*)&val; + + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write int. + /// </summary> + /// <param name="val">Int value.</param> + public abstract void WriteInt(int val); + + /// <summary> + /// Write int to specific position. + /// </summary> + /// <param name="writePos">Position.</param> + /// <param name="val">Value.</param> + public abstract void WriteInt(int writePos, int val); + + /// <summary> + /// Internal routine to write int value. + /// </summary> + /// <param name="val">Int value.</param> + /// <param name="data">Data pointer.</param> + protected void WriteInt0(int val, byte* data) + { + if (LittleEndian) + *((int*)data) = val; + else + { + byte* valPtr = (byte*)&val; + + data[0] = valPtr[3]; + data[1] = valPtr[2]; + data[2] = valPtr[1]; + data[3] = valPtr[0]; + } + } + + /// <summary> + /// Read int. + /// </summary> + /// <returns> + /// Int value. + /// </returns> + public abstract int ReadInt(); + + /// <summary> + /// Internal routine to read int value. + /// </summary> + /// <param name="data">Data pointer.</param> + /// <returns>Int value</returns> + protected int ReadInt0(byte* data) { + int val; + + if (LittleEndian) + val = *((int*)data); + else + { + byte* valPtr = (byte*)&val; + + valPtr[0] = data[3]; + valPtr[1] = data[2]; + valPtr[2] = data[1]; + valPtr[3] = data[0]; + } + + return val; + } + + /// <summary> + /// Write int array. + /// </summary> + /// <param name="val">Int array.</param> + public abstract void WriteIntArray(int[] val); + + /// <summary> + /// Internal routine to write int array. + /// </summary> + /// <param name="val">Int array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected void WriteIntArray0(int[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (int* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + int val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read int array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Int array. + /// </returns> + public abstract int[] ReadIntArray(int cnt); + + /// <summary> + /// Internal routine to read int array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Int array</returns> + protected int[] ReadIntArray0(int len, byte* data, int cnt) + { + int[] res = new int[len]; + + if (LittleEndian) + { + fixed (int* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + int val; + + byte* valPtr = (byte*)&val; + + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write float. + /// </summary> + /// <param name="val">Float value.</param> + public void WriteFloat(float val) + { + int val0 = *(int*)(&val); + + WriteInt(val0); + } + + /// <summary> + /// Read float. + /// </summary> + /// <returns> + /// Float value. + /// </returns> + public float ReadFloat() + { + int val = ReadInt(); + + return *(float*)(&val); + } + + /// <summary> + /// Write float array. + /// </summary> + /// <param name="val">Float array.</param> + public abstract void WriteFloatArray(float[] val); + + /// <summary> + /// Internal routine to write float array. + /// </summary> + /// <param name="val">Int array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected void WriteFloatArray0(float[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (float* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + float val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read float array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Float array. + /// </returns> + public abstract float[] ReadFloatArray(int cnt); + + /// <summary> + /// Internal routine to read float array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Float array</returns> + protected float[] ReadFloatArray0(int len, byte* data, int cnt) + { + float[] res = new float[len]; + + if (LittleEndian) + { + fixed (float* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + int val; + + byte* valPtr = (byte*)&val; + + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write long. + /// </summary> + /// <param name="val">Long value.</param> + public abstract void WriteLong(long val); + + /// <summary> + /// Internal routine to write long value. + /// </summary> + /// <param name="val">Long value.</param> + /// <param name="data">Data pointer.</param> + protected void WriteLong0(long val, byte* data) + { + if (LittleEndian) + *((long*)data) = val; + else + { + byte* valPtr = (byte*)&val; + + data[0] = valPtr[7]; + data[1] = valPtr[6]; + data[2] = valPtr[5]; + data[3] = valPtr[4]; + data[4] = valPtr[3]; + data[5] = valPtr[2]; + data[6] = valPtr[1]; + data[7] = valPtr[0]; + } + } + + /// <summary> + /// Read long. + /// </summary> + /// <returns> + /// Long value. + /// </returns> + public abstract long ReadLong(); + + /// <summary> + /// Internal routine to read long value. + /// </summary> + /// <param name="data">Data pointer.</param> + /// <returns>Long value</returns> + protected long ReadLong0(byte* data) + { + long val; + + if (LittleEndian) + val = *((long*)data); + else + { + byte* valPtr = (byte*)&val; + + valPtr[0] = data[7]; + valPtr[1] = data[6]; + valPtr[2] = data[5]; + valPtr[3] = data[4]; + valPtr[4] = data[3]; + valPtr[5] = data[2]; + valPtr[6] = data[1]; + valPtr[7] = data[0]; + } + + return val; + } + + /// <summary> + /// Write long array. + /// </summary> + /// <param name="val">Long array.</param> + public abstract void WriteLongArray(long[] val); + + /// <summary> + /// Internal routine to write long array. + /// </summary> + /// <param name="val">Long array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected void WriteLongArray0(long[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (long* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + long val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[7]; + *curPos++ = valPtr[6]; + *curPos++ = valPtr[5]; + *curPos++ = valPtr[4]; + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read long array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Long array. + /// </returns> + public abstract long[] ReadLongArray(int cnt); + + /// <summary> + /// Internal routine to read long array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Long array</returns> + protected long[] ReadLongArray0(int len, byte* data, int cnt) + { + long[] res = new long[len]; + + if (LittleEndian) + { + fixed (long* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + long val; + + byte* valPtr = (byte*)&val; + + valPtr[7] = *data++; + valPtr[6] = *data++; + valPtr[5] = *data++; + valPtr[4] = *data++; + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write double. + /// </summary> + /// <param name="val">Double value.</param> + public void WriteDouble(double val) + { + long val0 = *(long*)(&val); + + WriteLong(val0); + } + + /// <summary> + /// Read double. + /// </summary> + /// <returns> + /// Double value. + /// </returns> + public double ReadDouble() + { + long val = ReadLong(); + + return *(double*)(&val); + } + + /// <summary> + /// Write double array. + /// </summary> + /// <param name="val">Double array.</param> + public abstract void WriteDoubleArray(double[] val); + + /// <summary> + /// Internal routine to write double array. + /// </summary> + /// <param name="val">Double array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected void WriteDoubleArray0(double[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (double* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + double val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[7]; + *curPos++ = valPtr[6]; + *curPos++ = valPtr[5]; + *curPos++ = valPtr[4]; + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read double array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Double array. + /// </returns> + public abstract double[] ReadDoubleArray(int cnt); + + /// <summary> + /// Internal routine to read double array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Double array</returns> + protected double[] ReadDoubleArray0(int len, byte* data, int cnt) + { + double[] res = new double[len]; + + if (LittleEndian) + { + fixed (double* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + double val; + + byte* valPtr = (byte*)&val; + + valPtr[7] = *data++; + valPtr[6] = *data++; + valPtr[5] = *data++; + valPtr[4] = *data++; + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write string. + /// </summary> + /// <param name="chars">Characters.</param> + /// <param name="charCnt">Char count.</param> + /// <param name="byteCnt">Byte count.</param> + /// <param name="encoding">Encoding.</param> + /// <returns> + /// Amounts of bytes written. + /// </returns> + public abstract int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding); + + /// <summary> + /// Internal string write routine. + /// </summary> + /// <param name="chars">Chars.</param> + /// <param name="charCnt">Chars count.</param> + /// <param name="byteCnt">Bytes count.</param> + /// <param name="enc">Encoding.</param> + /// <param name="data">Data.</param> + /// <returns>Amount of bytes written.</returns> + protected int WriteString0(char* chars, int charCnt, int byteCnt, Encoding enc, byte* data) + { + return enc.GetBytes(chars, charCnt, data, byteCnt); + } + + /// <summary> + /// Write arbitrary data. + /// </summary> + /// <param name="src">Source array.</param> + /// <param name="off">Offset</param> + /// <param name="cnt">Count.</param> + public void Write(byte[] src, int off, int cnt) + { + fixed (byte* src0 = src) + { + Write(src0 + off, cnt); + } + } + + /// <summary> + /// Read arbitrary data. + /// </summary> + /// <param name="dest">Destination array.</param> + /// <param name="off">Offset.</param> + /// <param name="cnt">Count.</param> + /// <returns> + /// Amount of bytes read. + /// </returns> + public void Read(byte[] dest, int off, int cnt) + { + fixed (byte* dest0 = dest) + { + Read(dest0 + off, cnt); + } + } + + /// <summary> + /// Write arbitrary data. + /// </summary> + /// <param name="src">Source.</param> + /// <param name="cnt">Count.</param> + public abstract void Write(byte* src, int cnt); + + /// <summary> + /// Internal write routine. + /// </summary> + /// <param name="src">Source.</param> + /// <param name="cnt">Count.</param> + /// <param name="data">Data (dsetination).</param> + protected void WriteInternal(byte* src, int cnt, byte* data) + { + CopyMemory(src, data + Pos, cnt); + } + + /// <summary> + /// Read arbitrary data. + /// </summary> + /// <param name="dest">Destination.</param> + /// <param name="cnt">Count.</param> + /// <returns></returns> + public abstract void Read(byte* dest, int cnt); + + /// <summary> + /// Internal read routine. + /// </summary> + /// <param name="dest">Destination.</param> + /// <param name="cnt">Count.</param> + /// <param name="data">Data (source).</param> + /// <returns>Amount of bytes written.</returns> + protected void ReadInternal(byte* dest, int cnt, byte* data) + { + int cnt0 = Math.Min(Remaining(), cnt); + + CopyMemory(data + Pos, dest, cnt0); + + ShiftRead(cnt0); + } + + /// <summary> + /// Position. + /// </summary> + public int Position + { + get { return Pos; } + } + + /// <summary> + /// Gets remaining bytes in the stream. + /// </summary> + /// <returns> + /// Remaining bytes. + /// </returns> + public abstract int Remaining(); + + /// <summary> + /// Gets underlying array, avoiding copying if possible. + /// </summary> + /// <returns> + /// Underlying array. + /// </returns> + public abstract byte[] Array(); + + /// <summary> + /// Gets underlying data in a new array. + /// </summary> + /// <returns> + /// New array with data. + /// </returns> + public abstract byte[] ArrayCopy(); + + /// <summary> + /// Check whether array passed as argument is the same as the stream hosts. + /// </summary> + /// <param name="arr">Array.</param> + /// <returns> + /// <c>True</c> if they are same. + /// </returns> + public virtual bool IsSameArray(byte[] arr) + { + return false; + } + + /// <summary> + /// Seek to the given positoin. + /// </summary> + /// <param name="offset">Offset.</param> + /// <param name="origin">Seek origin.</param> + /// <returns> + /// Position. + /// </returns> + /// <exception cref="System.ArgumentException"> + /// Unsupported seek origin: + origin + /// or + /// Seek before origin: + newPos + /// </exception> + public int Seek(int offset, SeekOrigin origin) + { + int newPos; + + switch (origin) + { + case SeekOrigin.Begin: + { + newPos = offset; + + break; + } + + case SeekOrigin.Current: + { + newPos = Pos + offset; + + break; + } + + default: + throw new ArgumentException("Unsupported seek origin: " + origin); + } + + if (newPos < 0) + throw new ArgumentException("Seek before origin: " + newPos); + + EnsureWriteCapacity(newPos); + + Pos = newPos; + + return Pos; + } + + /** <inheritdoc /> */ + public void Dispose() + { + if (_disposed) + return; + + Dispose(true); + + GC.SuppressFinalize(this); + + _disposed = true; + } + + /// <summary> + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// </summary> + protected abstract void Dispose(bool disposing); + + /// <summary> + /// Ensure capacity for write. + /// </summary> + /// <param name="cnt">Bytes count.</param> + protected abstract void EnsureWriteCapacity(int cnt); + + /// <summary> + /// Ensure capacity for write and shift position. + /// </summary> + /// <param name="cnt">Bytes count.</param> + /// <returns>Position before shift.</returns> + protected int EnsureWriteCapacityAndShift(int cnt) + { + int pos0 = Pos; + + EnsureWriteCapacity(Pos + cnt); + + ShiftWrite(cnt); + + return pos0; + } + + /// <summary> + /// Ensure capacity for read. + /// </summary> + /// <param name="cnt">Bytes count.</param> + protected abstract void EnsureReadCapacity(int cnt); + + /// <summary> + /// Ensure capacity for read and shift position. + /// </summary> + /// <param name="cnt">Bytes count.</param> + /// <returns>Position before shift.</returns> + protected int EnsureReadCapacityAndShift(int cnt) + { + int pos0 = Pos; + + EnsureReadCapacity(cnt); + + ShiftRead(cnt); + + return pos0; + } + + /// <summary> + /// Shift position due to write + /// </summary> + /// <param name="cnt">Bytes count.</param> + protected void ShiftWrite(int cnt) + { + Pos += cnt; + } + + /// <summary> + /// Shift position due to read. + /// </summary> + /// <param name="cnt">Bytes count.</param> + protected void ShiftRead(int cnt) + { + Pos += cnt; + } + + /// <summary> + /// Calculate new capacity. + /// </summary> + /// <param name="curCap">Current capacity.</param> + /// <param name="reqCap">Required capacity.</param> + /// <returns>New capacity.</returns> + protected static int Capacity(int curCap, int reqCap) + { + int newCap; + + if (reqCap < 256) + newCap = 256; + else + { + newCap = curCap << 1; + + if (newCap < reqCap) + newCap = reqCap; + } + + return newCap; + } + + /// <summary> + /// Unsafe memory copy routine. + /// </summary> + /// <param name="src">Source.</param> + /// <param name="dest">Destination.</param> + /// <param name="len">Length.</param> + public static void CopyMemory(byte* src, byte* dest, int len) + { + if (MemcpyInverted) + Memcpy.Invoke(dest, src, len); + else + Memcpy.Invoke(src, dest, len); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs new file mode 100644 index 0000000..690f92c --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableHeapStream.cs @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable.IO +{ + using System; + using System.IO; + using System.Text; + + /// <summary> + /// Portable onheap stream. + /// </summary> + internal unsafe class PortableHeapStream : PortableAbstractStream + { + /** Data array. */ + protected byte[] Data; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cap">Initial capacity.</param> + public PortableHeapStream(int cap) + { + Data = new byte[cap]; + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="data">Data array.</param> + public PortableHeapStream(byte[] data) + { + Data = data; + } + + /** <inheritdoc /> */ + public override void WriteByte(byte val) + { + int pos0 = EnsureWriteCapacityAndShift(1); + + Data[pos0] = val; + } + + /** <inheritdoc /> */ + public override byte ReadByte() + { + int pos0 = EnsureReadCapacityAndShift(1); + + return Data[pos0]; + } + + /** <inheritdoc /> */ + public override void WriteByteArray(byte[] val) + { + int pos0 = EnsureWriteCapacityAndShift(val.Length); + + fixed (byte* data0 = Data) + { + WriteByteArray0(val, data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override byte[] ReadByteArray(int cnt) + { + int pos0 = EnsureReadCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + return ReadByteArray0(cnt, data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override void WriteBoolArray(bool[] val) + { + int pos0 = EnsureWriteCapacityAndShift(val.Length); + + fixed (byte* data0 = Data) + { + WriteBoolArray0(val, data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override bool[] ReadBoolArray(int cnt) + { + int pos0 = EnsureReadCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + return ReadBoolArray0(cnt, data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override void WriteShort(short val) + { + int pos0 = EnsureWriteCapacityAndShift(2); + + fixed (byte* data0 = Data) + { + WriteShort0(val, data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override short ReadShort() + { + int pos0 = EnsureReadCapacityAndShift(2); + + fixed (byte* data0 = Data) + { + return ReadShort0(data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override void WriteShortArray(short[] val) + { + int cnt = val.Length << 1; + + int pos0 = EnsureWriteCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + WriteShortArray0(val, data0 + pos0, cnt); + } + } + + /** <inheritdoc /> */ + public override short[] ReadShortArray(int cnt) + { + int cnt0 = cnt << 1; + + int pos0 = EnsureReadCapacityAndShift(cnt0); + + fixed (byte* data0 = Data) + { + return ReadShortArray0(cnt, data0 + pos0, cnt0); + } + } + + /** <inheritdoc /> */ + public override void WriteCharArray(char[] val) + { + int cnt = val.Length << 1; + + int pos0 = EnsureWriteCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + WriteCharArray0(val, data0 + pos0, cnt); + } + } + + /** <inheritdoc /> */ + public override char[] ReadCharArray(int cnt) + { + int cnt0 = cnt << 1; + + int pos0 = EnsureReadCapacityAndShift(cnt0); + + fixed (byte* data0 = Data) + { + return ReadCharArray0(cnt, data0 + pos0, cnt0); + } + } + + /** <inheritdoc /> */ + public override void WriteInt(int val) + { + int pos0 = EnsureWriteCapacityAndShift(4); + + fixed (byte* data0 = Data) + { + WriteInt0(val, data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override void WriteInt(int writePos, int val) + { + EnsureWriteCapacity(writePos + 4); + + fixed (byte* data0 = Data) + { + WriteInt0(val, data0 + writePos); + } + } + + /** <inheritdoc /> */ + public override int ReadInt() + { + int pos0 = EnsureReadCapacityAndShift(4); + + fixed (byte* data0 = Data) + { + return ReadInt0(data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override void WriteIntArray(int[] val) + { + int cnt = val.Length << 2; + + int pos0 = EnsureWriteCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + WriteIntArray0(val, data0 + pos0, cnt); + } + } + + /** <inheritdoc /> */ + public override int[] ReadIntArray(int cnt) + { + int cnt0 = cnt << 2; + + int pos0 = EnsureReadCapacityAndShift(cnt0); + + fixed (byte* data0 = Data) + { + return ReadIntArray0(cnt, data0 + pos0, cnt0); + } + } + + /** <inheritdoc /> */ + public override void WriteFloatArray(float[] val) + { + int cnt = val.Length << 2; + + int pos0 = EnsureWriteCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + WriteFloatArray0(val, data0 + pos0, cnt); + } + } + + /** <inheritdoc /> */ + public override float[] ReadFloatArray(int cnt) + { + int cnt0 = cnt << 2; + + int pos0 = EnsureReadCapacityAndShift(cnt0); + + fixed (byte* data0 = Data) + { + return ReadFloatArray0(cnt, data0 + pos0, cnt0); + } + } + + /** <inheritdoc /> */ + public override void WriteLong(long val) + { + int pos0 = EnsureWriteCapacityAndShift(8); + + fixed (byte* data0 = Data) + { + WriteLong0(val, data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override long ReadLong() + { + int pos0 = EnsureReadCapacityAndShift(8); + + fixed (byte* data0 = Data) + { + return ReadLong0(data0 + pos0); + } + } + + /** <inheritdoc /> */ + public override void WriteLongArray(long[] val) + { + int cnt = val.Length << 3; + + int pos0 = EnsureWriteCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + WriteLongArray0(val, data0 + pos0, cnt); + } + } + + /** <inheritdoc /> */ + public override long[] ReadLongArray(int cnt) + { + int cnt0 = cnt << 3; + + int pos0 = EnsureReadCapacityAndShift(cnt0); + + fixed (byte* data0 = Data) + { + return ReadLongArray0(cnt, data0 + pos0, cnt0); + } + } + + /** <inheritdoc /> */ + public override void WriteDoubleArray(double[] val) + { + int cnt = val.Length << 3; + + int pos0 = EnsureWriteCapacityAndShift(cnt); + + fixed (byte* data0 = Data) + { + WriteDoubleArray0(val, data0 + pos0, cnt); + } + } + + /** <inheritdoc /> */ + public override double[] ReadDoubleArray(int cnt) + { + int cnt0 = cnt << 3; + + int pos0 = EnsureReadCapacityAndShift(cnt0); + + fixed (byte* data0 = Data) + { + return ReadDoubleArray0(cnt, data0 + pos0, cnt0); + } + } + + /** <inheritdoc /> */ + public override int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding) + { + int pos0 = EnsureWriteCapacityAndShift(byteCnt); + + int written; + + fixed (byte* data0 = Data) + { + written = WriteString0(chars, charCnt, byteCnt, encoding, data0 + pos0); + } + + return written; + } + + /** <inheritdoc /> */ + public override void Write(byte* src, int cnt) + { + EnsureWriteCapacity(Pos + cnt); + + fixed (byte* data0 = Data) + { + WriteInternal(src, cnt, data0); + } + + ShiftWrite(cnt); + } + + /** <inheritdoc /> */ + public override void Read(byte* dest, int cnt) + { + fixed (byte* data0 = Data) + { + ReadInternal(dest, cnt, data0); + } + } + + /** <inheritdoc /> */ + public override int Remaining() + { + return Data.Length - Pos; + } + + /** <inheritdoc /> */ + public override byte[] Array() + { + return Data; + } + + /** <inheritdoc /> */ + public override byte[] ArrayCopy() + { + byte[] copy = new byte[Pos]; + + Buffer.BlockCopy(Data, 0, copy, 0, Pos); + + return copy; + } + + /** <inheritdoc /> */ + public override bool IsSameArray(byte[] arr) + { + return Data == arr; + } + + /** <inheritdoc /> */ + protected override void Dispose(bool disposing) + { + // No-op. + } + + /// <summary> + /// Internal array. + /// </summary> + internal byte[] InternalArray + { + get { return Data; } + } + + /** <inheritdoc /> */ + protected override void EnsureWriteCapacity(int cnt) + { + if (cnt > Data.Length) + { + int newCap = Capacity(Data.Length, cnt); + + byte[] data0 = new byte[newCap]; + + // Copy the whole initial array length here because it can be changed + // from Java without position adjusting. + Buffer.BlockCopy(Data, 0, data0, 0, Data.Length); + + Data = data0; + } + } + + /** <inheritdoc /> */ + protected override void EnsureReadCapacity(int cnt) + { + if (Data.Length - Pos < cnt) + throw new EndOfStreamException("Not enough data in stream [expected=" + cnt + + ", remaining=" + (Data.Length - Pos) + ']'); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableStreamAdapter.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableStreamAdapter.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableStreamAdapter.cs new file mode 100644 index 0000000..1d17f89 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Io/PortableStreamAdapter.cs @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable.IO +{ + using System; + using System.IO; + + /// <summary> + /// Adapter providing .Net streaming functionality over the portable stream. + /// </summary> + internal class PortableStreamAdapter : Stream + { + /// <summary> + /// + /// </summary> + private readonly IPortableStream _stream; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="stream">Stream.</param> + public PortableStreamAdapter(IPortableStream stream) + { + _stream = stream; + } + + /** <inheritDoc /> */ + public override void Write(byte[] buffer, int offset, int count) + { + _stream.Write(buffer, offset, count); + } + + /** <inheritDoc /> */ + public override int Read(byte[] buffer, int offset, int count) + { + _stream.Read(buffer, offset, count); + + return count; + } + + /** <inheritDoc /> */ + public override void Flush() + { + // No-op. + } + + /** <inheritDoc /> */ + public override bool CanRead + { + get { return true; } + } + + /** <inheritDoc /> */ + public override bool CanWrite + { + get { return true; } + } + + /** <inheritDoc /> */ + public override bool CanSeek + { + get { return false; } + } + + /** <inheritDoc /> */ + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException("Stream is not seekable."); + } + + /** <inheritDoc /> */ + public override long Position + { + get + { + throw new NotSupportedException("Stream is not seekable."); + } + set + { + throw new NotSupportedException("Stream is not seekable."); + } + } + + /** <inheritDoc /> */ + public override long Length + { + get + { + throw new NotSupportedException("Stream is not seekable."); + } + } + + /** <inheritDoc /> */ + public override void SetLength(long value) + { + throw new NotSupportedException("Stream is not seekable."); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/IPortableMetadataHandler.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/IPortableMetadataHandler.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/IPortableMetadataHandler.cs new file mode 100644 index 0000000..dc3090f --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/IPortableMetadataHandler.cs @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable.Metadata +{ + using System.Collections.Generic; + + /// <summary> + /// Portable metadata handler. + /// </summary> + public interface IPortableMetadataHandler + { + /// <summary> + /// Callback invoked when named field is written. + /// </summary> + /// <param name="fieldId">Field ID.</param> + /// <param name="fieldName">Field name.</param> + /// <param name="typeId">Field type ID.</param> + void OnFieldWrite(int fieldId, string fieldName, int typeId); + + /// <summary> + /// Callback invoked when object write is finished and it is time to collect missing metadata. + /// </summary> + /// <returns>Collected metadata.</returns> + IDictionary<string, int> OnObjectWriteFinished(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableHashsetMetadataHandler.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableHashsetMetadataHandler.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableHashsetMetadataHandler.cs new file mode 100644 index 0000000..8df5f36 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableHashsetMetadataHandler.cs @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable.Metadata +{ + using System.Collections.Generic; + + /// <summary> + /// Metadata handler which uses hash set to determine whether field was already written or not. + /// </summary> + internal class PortableHashsetMetadataHandler : IPortableMetadataHandler + { + /** Empty fields collection. */ + private static readonly IDictionary<string, int> EmptyFields = new Dictionary<string, int>(); + + /** IDs known when serialization starts. */ + private readonly ICollection<int> _ids; + + /** New fields. */ + private IDictionary<string, int> _fieldMap; + + /** */ + private readonly bool _newType; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="ids">IDs.</param> + /// <param name="newType">True is metadata for type is not saved.</param> + public PortableHashsetMetadataHandler(ICollection<int> ids, bool newType) + { + _ids = ids; + _newType = newType; + } + + /** <inheritdoc /> */ + public void OnFieldWrite(int fieldId, string fieldName, int typeId) + { + if (!_ids.Contains(fieldId)) + { + if (_fieldMap == null) + _fieldMap = new Dictionary<string, int>(); + + if (!_fieldMap.ContainsKey(fieldName)) + _fieldMap[fieldName] = typeId; + } + } + + /** <inheritdoc /> */ + public IDictionary<string, int> OnObjectWriteFinished() + { + return _fieldMap ?? (_newType ? EmptyFields : null); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataHolder.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataHolder.cs new file mode 100644 index 0000000..a3fa90f --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataHolder.cs @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable.Metadata +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Metadata for particular type. + /// </summary> + internal class PortableMetadataHolder + { + /** Type ID. */ + private readonly int _typeId; + + /** Type name. */ + private readonly string _typeName; + + /** Affinity key field name. */ + private readonly string _affKeyFieldName; + + /** Empty metadata when nothig is know about object fields yet. */ + private readonly IPortableMetadata _emptyMeta; + + /** Collection of know field IDs. */ + private volatile ICollection<int> _ids; + + /** Last known unmodifiable metadata which is given to the user. */ + private volatile PortableMetadataImpl _meta; + + /** Saved flag (set if type metadata was saved at least once). */ + private volatile bool _saved; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <param name="typeName">Type name.</param> + /// <param name="affKeyFieldName">Affinity key field name.</param> + public PortableMetadataHolder(int typeId, string typeName, string affKeyFieldName) + { + _typeId = typeId; + _typeName = typeName; + _affKeyFieldName = affKeyFieldName; + + _emptyMeta = new PortableMetadataImpl(typeId, typeName, null, affKeyFieldName); + } + + /// <summary> + /// Get saved flag. + /// </summary> + /// <returns>True if type metadata was saved at least once.</returns> + public bool Saved() + { + return _saved; + } + + /// <summary> + /// Get current type metadata. + /// </summary> + /// <returns>Type metadata.</returns> + public IPortableMetadata Metadata() + { + PortableMetadataImpl meta0 = _meta; + + return meta0 != null ? _meta : _emptyMeta; + } + + /// <summary> + /// Currently cached field IDs. + /// </summary> + /// <returns>Cached field IDs.</returns> + public ICollection<int> FieldIds() + { + ICollection<int> ids0 = _ids; + + if (_ids == null) + { + lock (this) + { + ids0 = _ids; + + if (ids0 == null) + { + ids0 = new HashSet<int>(); + + _ids = ids0; + } + } + } + + return ids0; + } + + /// <summary> + /// Merge newly sent field metadatas into existing ones. + /// </summary> + /// <param name="newMap">New field metadatas map.</param> + public void Merge(IDictionary<int, Tuple<string, int>> newMap) + { + _saved = true; + + if (newMap == null || newMap.Count == 0) + return; + + lock (this) + { + // 1. Create copies of the old meta. + ICollection<int> ids0 = _ids; + PortableMetadataImpl meta0 = _meta; + + ICollection<int> newIds = ids0 != null ? new HashSet<int>(ids0) : new HashSet<int>(); + + IDictionary<string, int> newFields = meta0 != null ? + new Dictionary<string, int>(meta0.FieldsMap()) : new Dictionary<string, int>(newMap.Count); + + // 2. Add new fields. + foreach (KeyValuePair<int, Tuple<string, int>> newEntry in newMap) + { + if (!newIds.Contains(newEntry.Key)) + newIds.Add(newEntry.Key); + + if (!newFields.ContainsKey(newEntry.Value.Item1)) + newFields[newEntry.Value.Item1] = newEntry.Value.Item2; + } + + // 3. Assign new meta. Order is important here: meta must be assigned before field IDs. + _meta = new PortableMetadataImpl(_typeId, _typeName, newFields, _affKeyFieldName); + _ids = newIds; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataImpl.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataImpl.cs new file mode 100644 index 0000000..88b40ad --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/Metadata/PortableMetadataImpl.cs @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable.Metadata +{ + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Portable; + + /// <summary> + /// Portable metadata implementation. + /// </summary> + internal class PortableMetadataImpl : IPortableMetadata + { + /** Empty metadata. */ + [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")] + public static readonly PortableMetadataImpl EmptyMeta = + new PortableMetadataImpl(PortableUtils.TypeObject, PortableTypeNames.TypeNameObject, null, null); + + /** Empty dictionary. */ + private static readonly IDictionary<string, int> EmptyDict = new Dictionary<string, int>(); + + /** Empty list. */ + private static readonly ICollection<string> EmptyList = new List<string>().AsReadOnly(); + + /** Fields. */ + private readonly IDictionary<string, int> _fields; + + /// <summary> + /// Get type name by type ID. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <returns>Type name.</returns> + private static string ConvertTypeName(int typeId) + { + switch (typeId) + { + case PortableUtils.TypeBool: + return PortableTypeNames.TypeNameBool; + case PortableUtils.TypeByte: + return PortableTypeNames.TypeNameByte; + case PortableUtils.TypeShort: + return PortableTypeNames.TypeNameShort; + case PortableUtils.TypeChar: + return PortableTypeNames.TypeNameChar; + case PortableUtils.TypeInt: + return PortableTypeNames.TypeNameInt; + case PortableUtils.TypeLong: + return PortableTypeNames.TypeNameLong; + case PortableUtils.TypeFloat: + return PortableTypeNames.TypeNameFloat; + case PortableUtils.TypeDouble: + return PortableTypeNames.TypeNameDouble; + case PortableUtils.TypeDecimal: + return PortableTypeNames.TypeNameDecimal; + case PortableUtils.TypeString: + return PortableTypeNames.TypeNameString; + case PortableUtils.TypeGuid: + return PortableTypeNames.TypeNameGuid; + case PortableUtils.TypeDate: + return PortableTypeNames.TypeNameDate; + case PortableUtils.TypeEnum: + return PortableTypeNames.TypeNameEnum; + case PortableUtils.TypePortable: + case PortableUtils.TypeObject: + return PortableTypeNames.TypeNameObject; + case PortableUtils.TypeArrayBool: + return PortableTypeNames.TypeNameArrayBool; + case PortableUtils.TypeArrayByte: + return PortableTypeNames.TypeNameArrayByte; + case PortableUtils.TypeArrayShort: + return PortableTypeNames.TypeNameArrayShort; + case PortableUtils.TypeArrayChar: + return PortableTypeNames.TypeNameArrayChar; + case PortableUtils.TypeArrayInt: + return PortableTypeNames.TypeNameArrayInt; + case PortableUtils.TypeArrayLong: + return PortableTypeNames.TypeNameArrayLong; + case PortableUtils.TypeArrayFloat: + return PortableTypeNames.TypeNameArrayFloat; + case PortableUtils.TypeArrayDouble: + return PortableTypeNames.TypeNameArrayDouble; + case PortableUtils.TypeArrayDecimal: + return PortableTypeNames.TypeNameArrayDecimal; + case PortableUtils.TypeArrayString: + return PortableTypeNames.TypeNameArrayString; + case PortableUtils.TypeArrayGuid: + return PortableTypeNames.TypeNameArrayGuid; + case PortableUtils.TypeArrayDate: + return PortableTypeNames.TypeNameArrayDate; + case PortableUtils.TypeArrayEnum: + return PortableTypeNames.TypeNameArrayEnum; + case PortableUtils.TypeArray: + return PortableTypeNames.TypeNameArrayObject; + case PortableUtils.TypeCollection: + return PortableTypeNames.TypeNameCollection; + case PortableUtils.TypeDictionary: + return PortableTypeNames.TypeNameMap; + default: + throw new PortableException("Invalid type ID: " + typeId); + } + } + + /// <summary> + /// Initializes a new instance of the <see cref="PortableMetadataImpl" /> class. + /// </summary> + /// <param name="reader">The reader.</param> + public PortableMetadataImpl(IPortableRawReader reader) + { + TypeId = reader.ReadInt(); + TypeName = reader.ReadString(); + AffinityKeyFieldName = reader.ReadString(); + _fields = reader.ReadGenericDictionary<string, int>(); + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <param name="typeName">Type name.</param> + /// <param name="fields">Fields.</param> + /// <param name="affKeyFieldName">Affinity key field name.</param> + public PortableMetadataImpl(int typeId, string typeName, IDictionary<string, int> fields, + string affKeyFieldName) + { + TypeId = typeId; + TypeName = typeName; + AffinityKeyFieldName = affKeyFieldName; + _fields = fields; + } + + /// <summary> + /// Type ID. + /// </summary> + /// <returns></returns> + public int TypeId { get; private set; } + + /// <summary> + /// Gets type name. + /// </summary> + public string TypeName { get; private set; } + + /// <summary> + /// Gets field names for that type. + /// </summary> + public ICollection<string> Fields + { + get { return _fields != null ? _fields.Keys : EmptyList; } + } + + /// <summary> + /// Gets field type for the given field name. + /// </summary> + /// <param name="fieldName">Field name.</param> + /// <returns> + /// Field type. + /// </returns> + public string FieldTypeName(string fieldName) + { + if (_fields != null) + { + int typeId; + + _fields.TryGetValue(fieldName, out typeId); + + return ConvertTypeName(typeId); + } + + return null; + } + + /// <summary> + /// Gets optional affinity key field name. + /// </summary> + public string AffinityKeyFieldName { get; private set; } + + /// <summary> + /// Gets fields map. + /// </summary> + /// <returns>Fields map.</returns> + public IDictionary<string, int> FieldsMap() + { + return _fields ?? EmptyDict; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5cec202c/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderField.cs ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderField.cs b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderField.cs new file mode 100644 index 0000000..026d0d4 --- /dev/null +++ b/modules/platform/src/main/dotnet/Apache.Ignite.Core/Impl/Portable/PortableBuilderField.cs @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Portable +{ + using System; + + /// <summary> + /// Portable builder field. + /// </summary> + internal class PortableBuilderField + { + /** Remove marker object. */ + public static readonly object RmvMarkerObj = new object(); + + /** Remove marker. */ + public static readonly PortableBuilderField RmvMarker = + new PortableBuilderField(null, RmvMarkerObj); + + /** Type. */ + private readonly Type _typ; + + /** Value. */ + private readonly object _val; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="typ">Type.</param> + /// <param name="val">Value.</param> + public PortableBuilderField(Type typ, object val) + { + _typ = typ; + _val = val; + } + + /// <summary> + /// Type. + /// </summary> + public Type Type + { + get + { + return _typ; + } + } + + /// <summary> + /// Value. + /// </summary> + public object Value + { + get + { + return _val; + } + } + } +}
