http://git-wip-us.apache.org/repos/asf/ignite/blob/894057e5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs new file mode 100644 index 0000000..91b8717 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/BinaryStreamBase.cs @@ -0,0 +1,1253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Binary.IO +{ + using System; + using System.IO; + using System.Text; + using Apache.Ignite.Core.Impl.Memory; + + /// <summary> + /// Base class for managed and unmanaged data streams. + /// </summary> + internal unsafe abstract class BinaryStreamBase : IBinaryStream + { + /** Byte: zero. */ + private const byte ByteZero = 0; + + /** Byte: one. */ + private const byte ByteOne = 1; + + /** LITTLE_ENDIAN flag. */ + private static readonly bool LittleEndian = BitConverter.IsLittleEndian; + + /** Position. */ + protected int Pos; + + /** Disposed flag. */ + private bool _disposed; + + /// <summary> + /// Write byte. + /// </summary> + /// <param name="val">Byte value.</param> + public abstract void WriteByte(byte val); + + /// <summary> + /// Read byte. + /// </summary> + /// <returns> + /// Byte value. + /// </returns> + public abstract byte ReadByte(); + + /// <summary> + /// Write byte array. + /// </summary> + /// <param name="val">Byte array.</param> + public abstract void WriteByteArray(byte[] val); + + /// <summary> + /// Internal routine to write byte array. + /// </summary> + /// <param name="val">Byte array.</param> + /// <param name="data">Data pointer.</param> + protected static void WriteByteArray0(byte[] val, byte* data) + { + fixed (byte* val0 = val) + { + CopyMemory(val0, data, val.Length); + } + } + + /// <summary> + /// Read byte array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Byte array. + /// </returns> + public abstract byte[] ReadByteArray(int cnt); + + /// <summary> + /// Internal routine to read byte array. + /// </summary> + /// <param name="len">Array length.</param> + /// <param name="data">Data pointer.</param> + /// <returns>Byte array</returns> + protected static byte[] ReadByteArray0(int len, byte* data) + { + byte[] res = new byte[len]; + + fixed (byte* res0 = res) + { + CopyMemory(data, res0, len); + } + + return res; + } + + /// <summary> + /// Write bool. + /// </summary> + /// <param name="val">Bool value.</param> + public void WriteBool(bool val) + { + WriteByte(val ? ByteOne : ByteZero); + } + + /// <summary> + /// Read bool. + /// </summary> + /// <returns> + /// Bool value. + /// </returns> + public bool ReadBool() + { + return ReadByte() == ByteOne; + } + + /// <summary> + /// Write bool array. + /// </summary> + /// <param name="val">Bool array.</param> + public abstract void WriteBoolArray(bool[] val); + + /// <summary> + /// Internal routine to write bool array. + /// </summary> + /// <param name="val">Bool array.</param> + /// <param name="data">Data pointer.</param> + protected static void WriteBoolArray0(bool[] val, byte* data) + { + fixed (bool* val0 = val) + { + CopyMemory((byte*)val0, data, val.Length); + } + } + + /// <summary> + /// Read bool array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Bool array. + /// </returns> + public abstract bool[] ReadBoolArray(int cnt); + + /// <summary> + /// Internal routine to read bool array. + /// </summary> + /// <param name="len">Array length.</param> + /// <param name="data">Data pointer.</param> + /// <returns>Bool array</returns> + protected static bool[] ReadBoolArray0(int len, byte* data) + { + bool[] res = new bool[len]; + + fixed (bool* res0 = res) + { + CopyMemory(data, (byte*)res0, len); + } + + return res; + } + + /// <summary> + /// Write short. + /// </summary> + /// <param name="val">Short value.</param> + public abstract void WriteShort(short val); + + /// <summary> + /// Internal routine to write short value. + /// </summary> + /// <param name="val">Short value.</param> + /// <param name="data">Data pointer.</param> + protected static void WriteShort0(short val, byte* data) + { + if (LittleEndian) + *((short*)data) = val; + else + { + byte* valPtr = (byte*)&val; + + data[0] = valPtr[1]; + data[1] = valPtr[0]; + } + } + + /// <summary> + /// Read short. + /// </summary> + /// <returns> + /// Short value. + /// </returns> + public abstract short ReadShort(); + + /// <summary> + /// Internal routine to read short value. + /// </summary> + /// <param name="data">Data pointer.</param> + /// <returns>Short value</returns> + protected static short ReadShort0(byte* data) + { + short val; + + if (LittleEndian) + val = *((short*)data); + else + { + byte* valPtr = (byte*)&val; + + valPtr[0] = data[1]; + valPtr[1] = data[0]; + } + + return val; + } + + /// <summary> + /// Write short array. + /// </summary> + /// <param name="val">Short array.</param> + public abstract void WriteShortArray(short[] val); + + /// <summary> + /// Internal routine to write short array. + /// </summary> + /// <param name="val">Short array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected static void WriteShortArray0(short[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (short* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + short val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read short array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Short array. + /// </returns> + public abstract short[] ReadShortArray(int cnt); + + /// <summary> + /// Internal routine to read short array. + /// </summary> + /// <param name="len">Array length.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Short array</returns> + protected static short[] ReadShortArray0(int len, byte* data, int cnt) + { + short[] res = new short[len]; + + if (LittleEndian) + { + fixed (short* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + short val; + + byte* valPtr = (byte*)&val; + + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write char. + /// </summary> + /// <param name="val">Char value.</param> + public void WriteChar(char val) + { + WriteShort(*(short*)(&val)); + } + + /// <summary> + /// Read char. + /// </summary> + /// <returns> + /// Char value. + /// </returns> + public char ReadChar() + { + short val = ReadShort(); + + return *(char*)(&val); + } + + /// <summary> + /// Write char array. + /// </summary> + /// <param name="val">Char array.</param> + public abstract void WriteCharArray(char[] val); + + /// <summary> + /// Internal routine to write char array. + /// </summary> + /// <param name="val">Char array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected static void WriteCharArray0(char[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (char* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + char val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read char array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Char array. + /// </returns> + public abstract char[] ReadCharArray(int cnt); + + /// <summary> + /// Internal routine to read char array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Char array</returns> + protected static char[] ReadCharArray0(int len, byte* data, int cnt) + { + char[] res = new char[len]; + + if (LittleEndian) + { + fixed (char* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + char val; + + byte* valPtr = (byte*)&val; + + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write int. + /// </summary> + /// <param name="val">Int value.</param> + public abstract void WriteInt(int val); + + /// <summary> + /// Write int to specific position. + /// </summary> + /// <param name="writePos">Position.</param> + /// <param name="val">Value.</param> + public abstract void WriteInt(int writePos, int val); + + /// <summary> + /// Internal routine to write int value. + /// </summary> + /// <param name="val">Int value.</param> + /// <param name="data">Data pointer.</param> + protected static void WriteInt0(int val, byte* data) + { + if (LittleEndian) + *((int*)data) = val; + else + { + byte* valPtr = (byte*)&val; + + data[0] = valPtr[3]; + data[1] = valPtr[2]; + data[2] = valPtr[1]; + data[3] = valPtr[0]; + } + } + + /// <summary> + /// Read int. + /// </summary> + /// <returns> + /// Int value. + /// </returns> + public abstract int ReadInt(); + + /// <summary> + /// Internal routine to read int value. + /// </summary> + /// <param name="data">Data pointer.</param> + /// <returns>Int value</returns> + protected static int ReadInt0(byte* data) { + int val; + + if (LittleEndian) + val = *((int*)data); + else + { + byte* valPtr = (byte*)&val; + + valPtr[0] = data[3]; + valPtr[1] = data[2]; + valPtr[2] = data[1]; + valPtr[3] = data[0]; + } + + return val; + } + + /// <summary> + /// Write int array. + /// </summary> + /// <param name="val">Int array.</param> + public abstract void WriteIntArray(int[] val); + + /// <summary> + /// Internal routine to write int array. + /// </summary> + /// <param name="val">Int array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected static void WriteIntArray0(int[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (int* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + int val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read int array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Int array. + /// </returns> + public abstract int[] ReadIntArray(int cnt); + + /// <summary> + /// Internal routine to read int array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Int array</returns> + protected static int[] ReadIntArray0(int len, byte* data, int cnt) + { + int[] res = new int[len]; + + if (LittleEndian) + { + fixed (int* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + int val; + + byte* valPtr = (byte*)&val; + + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write float. + /// </summary> + /// <param name="val">Float value.</param> + public void WriteFloat(float val) + { + int val0 = *(int*)(&val); + + WriteInt(val0); + } + + /// <summary> + /// Read float. + /// </summary> + /// <returns> + /// Float value. + /// </returns> + public float ReadFloat() + { + int val = ReadInt(); + + return *(float*)(&val); + } + + /// <summary> + /// Write float array. + /// </summary> + /// <param name="val">Float array.</param> + public abstract void WriteFloatArray(float[] val); + + /// <summary> + /// Internal routine to write float array. + /// </summary> + /// <param name="val">Int array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected static void WriteFloatArray0(float[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (float* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + float val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read float array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Float array. + /// </returns> + public abstract float[] ReadFloatArray(int cnt); + + /// <summary> + /// Internal routine to read float array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Float array</returns> + protected static float[] ReadFloatArray0(int len, byte* data, int cnt) + { + float[] res = new float[len]; + + if (LittleEndian) + { + fixed (float* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + int val; + + byte* valPtr = (byte*)&val; + + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write long. + /// </summary> + /// <param name="val">Long value.</param> + public abstract void WriteLong(long val); + + /// <summary> + /// Internal routine to write long value. + /// </summary> + /// <param name="val">Long value.</param> + /// <param name="data">Data pointer.</param> + protected static void WriteLong0(long val, byte* data) + { + if (LittleEndian) + *((long*)data) = val; + else + { + byte* valPtr = (byte*)&val; + + data[0] = valPtr[7]; + data[1] = valPtr[6]; + data[2] = valPtr[5]; + data[3] = valPtr[4]; + data[4] = valPtr[3]; + data[5] = valPtr[2]; + data[6] = valPtr[1]; + data[7] = valPtr[0]; + } + } + + /// <summary> + /// Read long. + /// </summary> + /// <returns> + /// Long value. + /// </returns> + public abstract long ReadLong(); + + /// <summary> + /// Internal routine to read long value. + /// </summary> + /// <param name="data">Data pointer.</param> + /// <returns>Long value</returns> + protected static long ReadLong0(byte* data) + { + long val; + + if (LittleEndian) + val = *((long*)data); + else + { + byte* valPtr = (byte*)&val; + + valPtr[0] = data[7]; + valPtr[1] = data[6]; + valPtr[2] = data[5]; + valPtr[3] = data[4]; + valPtr[4] = data[3]; + valPtr[5] = data[2]; + valPtr[6] = data[1]; + valPtr[7] = data[0]; + } + + return val; + } + + /// <summary> + /// Write long array. + /// </summary> + /// <param name="val">Long array.</param> + public abstract void WriteLongArray(long[] val); + + /// <summary> + /// Internal routine to write long array. + /// </summary> + /// <param name="val">Long array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected static void WriteLongArray0(long[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (long* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + long val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[7]; + *curPos++ = valPtr[6]; + *curPos++ = valPtr[5]; + *curPos++ = valPtr[4]; + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read long array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Long array. + /// </returns> + public abstract long[] ReadLongArray(int cnt); + + /// <summary> + /// Internal routine to read long array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Long array</returns> + protected static long[] ReadLongArray0(int len, byte* data, int cnt) + { + long[] res = new long[len]; + + if (LittleEndian) + { + fixed (long* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + long val; + + byte* valPtr = (byte*)&val; + + valPtr[7] = *data++; + valPtr[6] = *data++; + valPtr[5] = *data++; + valPtr[4] = *data++; + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write double. + /// </summary> + /// <param name="val">Double value.</param> + public void WriteDouble(double val) + { + long val0 = *(long*)(&val); + + WriteLong(val0); + } + + /// <summary> + /// Read double. + /// </summary> + /// <returns> + /// Double value. + /// </returns> + public double ReadDouble() + { + long val = ReadLong(); + + return *(double*)(&val); + } + + /// <summary> + /// Write double array. + /// </summary> + /// <param name="val">Double array.</param> + public abstract void WriteDoubleArray(double[] val); + + /// <summary> + /// Internal routine to write double array. + /// </summary> + /// <param name="val">Double array.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + protected static void WriteDoubleArray0(double[] val, byte* data, int cnt) + { + if (LittleEndian) + { + fixed (double* val0 = val) + { + CopyMemory((byte*)val0, data, cnt); + } + } + else + { + byte* curPos = data; + + for (int i = 0; i < val.Length; i++) + { + double val0 = val[i]; + + byte* valPtr = (byte*)&(val0); + + *curPos++ = valPtr[7]; + *curPos++ = valPtr[6]; + *curPos++ = valPtr[5]; + *curPos++ = valPtr[4]; + *curPos++ = valPtr[3]; + *curPos++ = valPtr[2]; + *curPos++ = valPtr[1]; + *curPos++ = valPtr[0]; + } + } + } + + /// <summary> + /// Read double array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns> + /// Double array. + /// </returns> + public abstract double[] ReadDoubleArray(int cnt); + + /// <summary> + /// Internal routine to read double array. + /// </summary> + /// <param name="len">Count.</param> + /// <param name="data">Data pointer.</param> + /// <param name="cnt">Bytes count.</param> + /// <returns>Double array</returns> + protected static double[] ReadDoubleArray0(int len, byte* data, int cnt) + { + double[] res = new double[len]; + + if (LittleEndian) + { + fixed (double* res0 = res) + { + CopyMemory(data, (byte*)res0, cnt); + } + } + else + { + for (int i = 0; i < len; i++) + { + double val; + + byte* valPtr = (byte*)&val; + + valPtr[7] = *data++; + valPtr[6] = *data++; + valPtr[5] = *data++; + valPtr[4] = *data++; + valPtr[3] = *data++; + valPtr[2] = *data++; + valPtr[1] = *data++; + valPtr[0] = *data++; + + res[i] = val; + } + } + + return res; + } + + /// <summary> + /// Write string. + /// </summary> + /// <param name="chars">Characters.</param> + /// <param name="charCnt">Char count.</param> + /// <param name="byteCnt">Byte count.</param> + /// <param name="encoding">Encoding.</param> + /// <returns> + /// Amounts of bytes written. + /// </returns> + public abstract int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding); + + /// <summary> + /// Internal string write routine. + /// </summary> + /// <param name="chars">Chars.</param> + /// <param name="charCnt">Chars count.</param> + /// <param name="byteCnt">Bytes count.</param> + /// <param name="enc">Encoding.</param> + /// <param name="data">Data.</param> + /// <returns>Amount of bytes written.</returns> + protected static int WriteString0(char* chars, int charCnt, int byteCnt, Encoding enc, byte* data) + { + return enc.GetBytes(chars, charCnt, data, byteCnt); + } + + /// <summary> + /// Write arbitrary data. + /// </summary> + /// <param name="src">Source array.</param> + /// <param name="off">Offset</param> + /// <param name="cnt">Count.</param> + public void Write(byte[] src, int off, int cnt) + { + fixed (byte* src0 = src) + { + Write(src0 + off, cnt); + } + } + + /// <summary> + /// Read arbitrary data. + /// </summary> + /// <param name="dest">Destination array.</param> + /// <param name="off">Offset.</param> + /// <param name="cnt">Count.</param> + /// <returns> + /// Amount of bytes read. + /// </returns> + public void Read(byte[] dest, int off, int cnt) + { + fixed (byte* dest0 = dest) + { + Read(dest0 + off, cnt); + } + } + + /// <summary> + /// Write arbitrary data. + /// </summary> + /// <param name="src">Source.</param> + /// <param name="cnt">Count.</param> + public abstract void Write(byte* src, int cnt); + + /// <summary> + /// Internal write routine. + /// </summary> + /// <param name="src">Source.</param> + /// <param name="cnt">Count.</param> + /// <param name="data">Data (dsetination).</param> + protected void WriteInternal(byte* src, int cnt, byte* data) + { + CopyMemory(src, data + Pos, cnt); + } + + /// <summary> + /// Read arbitrary data. + /// </summary> + /// <param name="dest">Destination.</param> + /// <param name="cnt">Count.</param> + /// <returns></returns> + public abstract void Read(byte* dest, int cnt); + + /// <summary> + /// Internal read routine. + /// </summary> + /// <param name="src">Source</param> + /// <param name="dest">Destination.</param> + /// <param name="cnt">Count.</param> + /// <returns>Amount of bytes written.</returns> + protected void ReadInternal(byte* src, byte* dest, int cnt) + { + int cnt0 = Math.Min(Remaining, cnt); + + CopyMemory(src + Pos, dest, cnt0); + + ShiftRead(cnt0); + } + + /// <summary> + /// Position. + /// </summary> + public int Position + { + get { return Pos; } + } + + /// <summary> + /// Gets remaining bytes in the stream. + /// </summary> + /// <value> + /// Remaining bytes. + /// </value> + public abstract int Remaining { get; } + + /// <summary> + /// Gets underlying array, avoiding copying if possible. + /// </summary> + /// <returns> + /// Underlying array. + /// </returns> + public abstract byte[] GetArray(); + + /// <summary> + /// Gets underlying data in a new array. + /// </summary> + /// <returns> + /// New array with data. + /// </returns> + public abstract byte[] GetArrayCopy(); + + /// <summary> + /// Check whether array passed as argument is the same as the stream hosts. + /// </summary> + /// <param name="arr">Array.</param> + /// <returns> + /// <c>True</c> if they are same. + /// </returns> + public virtual bool IsSameArray(byte[] arr) + { + return false; + } + + /// <summary> + /// Seek to the given positoin. + /// </summary> + /// <param name="offset">Offset.</param> + /// <param name="origin">Seek origin.</param> + /// <returns> + /// Position. + /// </returns> + /// <exception cref="System.ArgumentException"> + /// Unsupported seek origin: + origin + /// or + /// Seek before origin: + newPos + /// </exception> + public int Seek(int offset, SeekOrigin origin) + { + int newPos; + + switch (origin) + { + case SeekOrigin.Begin: + { + newPos = offset; + + break; + } + + case SeekOrigin.Current: + { + newPos = Pos + offset; + + break; + } + + default: + throw new ArgumentException("Unsupported seek origin: " + origin); + } + + if (newPos < 0) + throw new ArgumentException("Seek before origin: " + newPos); + + EnsureWriteCapacity(newPos); + + Pos = newPos; + + return Pos; + } + + /** <inheritdoc /> */ + public void Dispose() + { + if (_disposed) + return; + + Dispose(true); + + GC.SuppressFinalize(this); + + _disposed = true; + } + + /// <summary> + /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. + /// </summary> + protected abstract void Dispose(bool disposing); + + /// <summary> + /// Ensure capacity for write. + /// </summary> + /// <param name="cnt">Bytes count.</param> + protected abstract void EnsureWriteCapacity(int cnt); + + /// <summary> + /// Ensure capacity for write and shift position. + /// </summary> + /// <param name="cnt">Bytes count.</param> + /// <returns>Position before shift.</returns> + protected int EnsureWriteCapacityAndShift(int cnt) + { + int pos0 = Pos; + + EnsureWriteCapacity(Pos + cnt); + + ShiftWrite(cnt); + + return pos0; + } + + /// <summary> + /// Ensure capacity for read. + /// </summary> + /// <param name="cnt">Bytes count.</param> + protected abstract void EnsureReadCapacity(int cnt); + + /// <summary> + /// Ensure capacity for read and shift position. + /// </summary> + /// <param name="cnt">Bytes count.</param> + /// <returns>Position before shift.</returns> + protected int EnsureReadCapacityAndShift(int cnt) + { + int pos0 = Pos; + + EnsureReadCapacity(cnt); + + ShiftRead(cnt); + + return pos0; + } + + /// <summary> + /// Shift position due to write + /// </summary> + /// <param name="cnt">Bytes count.</param> + protected void ShiftWrite(int cnt) + { + Pos += cnt; + } + + /// <summary> + /// Shift position due to read. + /// </summary> + /// <param name="cnt">Bytes count.</param> + private void ShiftRead(int cnt) + { + Pos += cnt; + } + + /// <summary> + /// Calculate new capacity. + /// </summary> + /// <param name="curCap">Current capacity.</param> + /// <param name="reqCap">Required capacity.</param> + /// <returns>New capacity.</returns> + protected static int Capacity(int curCap, int reqCap) + { + int newCap; + + if (reqCap < 256) + newCap = 256; + else + { + newCap = curCap << 1; + + if (newCap < reqCap) + newCap = reqCap; + } + + return newCap; + } + + /// <summary> + /// Unsafe memory copy routine. + /// </summary> + /// <param name="src">Source.</param> + /// <param name="dest">Destination.</param> + /// <param name="len">Length.</param> + private static void CopyMemory(byte* src, byte* dest, int len) + { + PlatformMemoryUtils.CopyMemory(src, dest, len); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/894057e5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/IBinaryStream.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/IBinaryStream.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/IBinaryStream.cs new file mode 100644 index 0000000..d530713 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Io/IBinaryStream.cs @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +namespace Apache.Ignite.Core.Impl.Binary.IO +{ + using System; + using System.Diagnostics.CodeAnalysis; + using System.IO; + using System.Text; + + /// <summary> + /// Stream capable of working with binary objects. + /// </summary> + [CLSCompliant(false)] + [SuppressMessage("Microsoft.Naming", "CA1711:IdentifiersShouldNotHaveIncorrectSuffix")] + public unsafe interface IBinaryStream : IDisposable + { + /// <summary> + /// Write bool. + /// </summary> + /// <param name="val">Bool value.</param> + void WriteBool(bool val); + + /// <summary> + /// Read bool. + /// </summary> + /// <returns>Bool value.</returns> + bool ReadBool(); + + /// <summary> + /// Write bool array. + /// </summary> + /// <param name="val">Bool array.</param> + void WriteBoolArray(bool[] val); + + /// <summary> + /// Read bool array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Bool array.</returns> + bool[] ReadBoolArray(int cnt); + + /// <summary> + /// Write byte. + /// </summary> + /// <param name="val">Byte value.</param> + void WriteByte(byte val); + + /// <summary> + /// Read byte. + /// </summary> + /// <returns>Byte value.</returns> + byte ReadByte(); + + /// <summary> + /// Write byte array. + /// </summary> + /// <param name="val">Byte array.</param> + void WriteByteArray(byte[] val); + + /// <summary> + /// Read byte array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Byte array.</returns> + byte[] ReadByteArray(int cnt); + + /// <summary> + /// Write short. + /// </summary> + /// <param name="val">Short value.</param> + void WriteShort(short val); + + /// <summary> + /// Read short. + /// </summary> + /// <returns>Short value.</returns> + short ReadShort(); + + /// <summary> + /// Write short array. + /// </summary> + /// <param name="val">Short array.</param> + void WriteShortArray(short[] val); + + /// <summary> + /// Read short array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Short array.</returns> + short[] ReadShortArray(int cnt); + + /// <summary> + /// Write char. + /// </summary> + /// <param name="val">Char value.</param> + void WriteChar(char val); + + /// <summary> + /// Read char. + /// </summary> + /// <returns>Char value.</returns> + char ReadChar(); + + /// <summary> + /// Write char array. + /// </summary> + /// <param name="val">Char array.</param> + void WriteCharArray(char[] val); + + /// <summary> + /// Read char array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Char array.</returns> + char[] ReadCharArray(int cnt); + + /// <summary> + /// Write int. + /// </summary> + /// <param name="val">Int value.</param> + void WriteInt(int val); + + /// <summary> + /// Write int to specific position. + /// </summary> + /// <param name="writePos">Position.</param> + /// <param name="val">Value.</param> + void WriteInt(int writePos, int val); + + /// <summary> + /// Read int. + /// </summary> + /// <returns>Int value.</returns> + int ReadInt(); + + /// <summary> + /// Write int array. + /// </summary> + /// <param name="val">Int array.</param> + void WriteIntArray(int[] val); + + /// <summary> + /// Read int array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Int array.</returns> + int[] ReadIntArray(int cnt); + + /// <summary> + /// Write long. + /// </summary> + /// <param name="val">Long value.</param> + void WriteLong(long val); + + /// <summary> + /// Read long. + /// </summary> + /// <returns>Long value.</returns> + long ReadLong(); + + /// <summary> + /// Write long array. + /// </summary> + /// <param name="val">Long array.</param> + void WriteLongArray(long[] val); + + /// <summary> + /// Read long array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Long array.</returns> + long[] ReadLongArray(int cnt); + + /// <summary> + /// Write float. + /// </summary> + /// <param name="val">Float value.</param> + void WriteFloat(float val); + + /// <summary> + /// Read float. + /// </summary> + /// <returns>Float value.</returns> + float ReadFloat(); + + /// <summary> + /// Write float array. + /// </summary> + /// <param name="val">Float array.</param> + void WriteFloatArray(float[] val); + + /// <summary> + /// Read float array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Float array.</returns> + float[] ReadFloatArray(int cnt); + + /// <summary> + /// Write double. + /// </summary> + /// <param name="val">Double value.</param> + void WriteDouble(double val); + + /// <summary> + /// Read double. + /// </summary> + /// <returns>Double value.</returns> + double ReadDouble(); + + /// <summary> + /// Write double array. + /// </summary> + /// <param name="val">Double array.</param> + void WriteDoubleArray(double[] val); + + /// <summary> + /// Read double array. + /// </summary> + /// <param name="cnt">Count.</param> + /// <returns>Double array.</returns> + double[] ReadDoubleArray(int cnt); + + /// <summary> + /// Write string. + /// </summary> + /// <param name="chars">Characters.</param> + /// <param name="charCnt">Char count.</param> + /// <param name="byteCnt">Byte count.</param> + /// <param name="encoding">Encoding.</param> + /// <returns>Amounts of bytes written.</returns> + int WriteString(char* chars, int charCnt, int byteCnt, Encoding encoding); + + /// <summary> + /// Write arbitrary data. + /// </summary> + /// <param name="src">Source array.</param> + /// <param name="off">Offset</param> + /// <param name="cnt">Count.</param> + void Write(byte[] src, int off, int cnt); + + /// <summary> + /// Read arbitrary data. + /// </summary> + /// <param name="dest">Destination array.</param> + /// <param name="off">Offset.</param> + /// <param name="cnt">Count.</param> + /// <returns>Amount of bytes read.</returns> + void Read(byte[] dest, int off, int cnt); + + /// <summary> + /// Write arbitrary data. + /// </summary> + /// <param name="src">Source.</param> + /// <param name="cnt">Count.</param> + void Write(byte* src, int cnt); + + /// <summary> + /// Read arbitrary data. + /// </summary> + /// <param name="dest">Destination.</param> + /// <param name="cnt">Count.</param> + void Read(byte* dest, int cnt); + + /// <summary> + /// Position. + /// </summary> + int Position + { + get; + } + + /// <summary> + /// Gets remaining bytes in the stream. + /// </summary> + /// <value>Remaining bytes.</value> + int Remaining { get; } + + /// <summary> + /// Gets underlying array, avoiding copying if possible. + /// </summary> + /// <returns>Underlying array.</returns> + byte[] GetArray(); + + /// <summary> + /// Gets underlying data in a new array. + /// </summary> + /// <returns>New array with data.</returns> + byte[] GetArrayCopy(); + + /// <summary> + /// Check whether array passed as argument is the same as the stream hosts. + /// </summary> + /// <param name="arr">Array.</param> + /// <returns><c>True</c> if they are same.</returns> + bool IsSameArray(byte[] arr); + + /// <summary> + /// Seek to the given positoin. + /// </summary> + /// <param name="offset">Offset.</param> + /// <param name="origin">Seek origin.</param> + /// <returns>Position.</returns> + int Seek(int offset, SeekOrigin origin); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/894057e5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs new file mode 100644 index 0000000..251610e --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Binary +{ + using System; + using System.Collections.Generic; + using System.Globalization; + using System.Linq; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Binary.Metadata; + using Apache.Ignite.Core.Impl.Cache; + using Apache.Ignite.Core.Impl.Cache.Query.Continuous; + using Apache.Ignite.Core.Impl.Compute; + using Apache.Ignite.Core.Impl.Compute.Closure; + using Apache.Ignite.Core.Impl.Datastream; + using Apache.Ignite.Core.Impl.Messaging; + + /// <summary> + /// Marshaller implementation. + /// </summary> + internal class Marshaller + { + /** Binary configuration. */ + private readonly BinaryConfiguration _cfg; + + /** Type to descriptor map. */ + private readonly IDictionary<Type, IBinaryTypeDescriptor> _typeToDesc = + new Dictionary<Type, IBinaryTypeDescriptor>(); + + /** Type name to descriptor map. */ + private readonly IDictionary<string, IBinaryTypeDescriptor> _typeNameToDesc = + new Dictionary<string, IBinaryTypeDescriptor>(); + + /** ID to descriptor map. */ + private readonly IDictionary<long, IBinaryTypeDescriptor> _idToDesc = + new Dictionary<long, IBinaryTypeDescriptor>(); + + /** Cached metadatas. */ + private volatile IDictionary<int, BinaryTypeHolder> _metas = + new Dictionary<int, BinaryTypeHolder>(); + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="cfg">Configurtaion.</param> + public Marshaller(BinaryConfiguration cfg) + { + // Validation. + if (cfg == null) + cfg = new BinaryConfiguration(); + + if (cfg.TypeConfigurations == null) + cfg.TypeConfigurations = new List<BinaryTypeConfiguration>(); + + foreach (BinaryTypeConfiguration typeCfg in cfg.TypeConfigurations) + { + if (string.IsNullOrEmpty(typeCfg.TypeName)) + throw new BinaryObjectException("Type name cannot be null or empty: " + typeCfg); + } + + // Define system types. They use internal reflective stuff, so configuration doesn't affect them. + AddSystemTypes(); + + // 2. Define user types. + var dfltSerializer = cfg.DefaultSerializer == null ? new BinaryReflectiveSerializer() : null; + + var typeResolver = new TypeResolver(); + + ICollection<BinaryTypeConfiguration> typeCfgs = cfg.TypeConfigurations; + + if (typeCfgs != null) + foreach (BinaryTypeConfiguration typeCfg in typeCfgs) + AddUserType(cfg, typeCfg, typeResolver, dfltSerializer); + + ICollection<string> types = cfg.Types; + + if (types != null) + foreach (string type in types) + AddUserType(cfg, new BinaryTypeConfiguration(type), typeResolver, dfltSerializer); + + if (cfg.DefaultSerializer == null) + cfg.DefaultSerializer = dfltSerializer; + + _cfg = cfg; + } + + /// <summary> + /// Gets or sets the backing grid. + /// </summary> + public Ignite Ignite { get; set; } + + /// <summary> + /// Marshal object. + /// </summary> + /// <param name="val">Value.</param> + /// <returns>Serialized data as byte array.</returns> + public byte[] Marshal<T>(T val) + { + BinaryHeapStream stream = new BinaryHeapStream(128); + + Marshal(val, stream); + + return stream.GetArrayCopy(); + } + + /// <summary> + /// Marshal object. + /// </summary> + /// <param name="val">Value.</param> + /// <param name="stream">Output stream.</param> + /// <returns>Collection of metadatas (if any).</returns> + private void Marshal<T>(T val, IBinaryStream stream) + { + BinaryWriter writer = StartMarshal(stream); + + writer.Write(val); + + FinishMarshal(writer); + } + + /// <summary> + /// Start marshal session. + /// </summary> + /// <param name="stream">Stream.</param> + /// <returns>Writer.</returns> + public BinaryWriter StartMarshal(IBinaryStream stream) + { + return new BinaryWriter(this, stream); + } + + /// <summary> + /// Finish marshal session. + /// </summary> + /// <param name="writer">Writer.</param> + /// <returns>Dictionary with metadata.</returns> + public void FinishMarshal(IBinaryWriter writer) + { + var meta = ((BinaryWriter) writer).GetBinaryTypes(); + + var ignite = Ignite; + + if (ignite != null && meta != null && meta.Count > 0) + ignite.PutBinaryTypes(meta); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="data">Data array.</param> + /// <param name="keepBinary">Whether to keep binarizable as binary.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(byte[] data, bool keepBinary) + { + return Unmarshal<T>(new BinaryHeapStream(data), keepBinary); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="data">Data array.</param> + /// <param name="mode">The mode.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(byte[] data, BinaryMode mode = BinaryMode.Deserialize) + { + return Unmarshal<T>(new BinaryHeapStream(data), mode); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="stream">Stream over underlying byte array with correct position.</param> + /// <param name="keepBinary">Whether to keep binary objects in binary form.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(IBinaryStream stream, bool keepBinary) + { + return Unmarshal<T>(stream, keepBinary ? BinaryMode.KeepBinary : BinaryMode.Deserialize, null); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="stream">Stream over underlying byte array with correct position.</param> + /// <param name="mode">The mode.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(IBinaryStream stream, BinaryMode mode = BinaryMode.Deserialize) + { + return Unmarshal<T>(stream, mode, null); + } + + /// <summary> + /// Unmarshal object. + /// </summary> + /// <param name="stream">Stream over underlying byte array with correct position.</param> + /// <param name="mode">The mode.</param> + /// <param name="builder">Builder.</param> + /// <returns> + /// Object. + /// </returns> + public T Unmarshal<T>(IBinaryStream stream, BinaryMode mode, BinaryObjectBuilder builder) + { + return new BinaryReader(this, _idToDesc, stream, mode, builder).Deserialize<T>(); + } + + /// <summary> + /// Start unmarshal session. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="keepBinary">Whether to keep binarizable as binary.</param> + /// <returns> + /// Reader. + /// </returns> + public BinaryReader StartUnmarshal(IBinaryStream stream, bool keepBinary) + { + return new BinaryReader(this, _idToDesc, stream, + keepBinary ? BinaryMode.KeepBinary : BinaryMode.Deserialize, null); + } + + /// <summary> + /// Start unmarshal session. + /// </summary> + /// <param name="stream">Stream.</param> + /// <param name="mode">The mode.</param> + /// <returns>Reader.</returns> + public BinaryReader StartUnmarshal(IBinaryStream stream, BinaryMode mode = BinaryMode.Deserialize) + { + return new BinaryReader(this, _idToDesc, stream, mode, null); + } + + /// <summary> + /// Gets metadata for the given type ID. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <returns>Metadata or null.</returns> + public IBinaryType GetBinaryType(int typeId) + { + if (Ignite != null) + { + IBinaryType meta = Ignite.GetBinaryType(typeId); + + if (meta != null) + return meta; + } + + return BinaryType.EmptyMeta; + } + + /// <summary> + /// Gets binary type handler for the given type ID. + /// </summary> + /// <param name="desc">Type descriptor.</param> + /// <returns>Binary type handler.</returns> + public IBinaryTypeHandler GetBinaryTypeHandler(IBinaryTypeDescriptor desc) + { + BinaryTypeHolder holder; + + if (!_metas.TryGetValue(desc.TypeId, out holder)) + { + lock (this) + { + if (!_metas.TryGetValue(desc.TypeId, out holder)) + { + IDictionary<int, BinaryTypeHolder> metas0 = + new Dictionary<int, BinaryTypeHolder>(_metas); + + holder = new BinaryTypeHolder(desc.TypeId, desc.TypeName, desc.AffinityKeyFieldName); + + metas0[desc.TypeId] = holder; + + _metas = metas0; + } + } + } + + if (holder != null) + { + ICollection<int> ids = holder.FieldIds(); + + bool newType = ids.Count == 0 && !holder.Saved(); + + return new BinaryTypeHashsetHandler(ids, newType); + } + + return null; + } + + /// <summary> + /// Callback invoked when metadata has been sent to the server and acknowledged by it. + /// </summary> + /// <param name="newMetas">Binary types.</param> + public void OnBinaryTypesSent(IDictionary<int, IBinaryType> newMetas) + { + foreach (KeyValuePair<int, IBinaryType> metaEntry in newMetas) + { + BinaryType meta = (BinaryType) metaEntry.Value; + + IDictionary<int, Tuple<string, int>> mergeInfo = + new Dictionary<int, Tuple<string, int>>(meta.FieldsMap().Count); + + foreach (KeyValuePair<string, int> fieldMeta in meta.FieldsMap()) + { + int fieldId = BinaryUtils.FieldId(metaEntry.Key, fieldMeta.Key, null, null); + + mergeInfo[fieldId] = new Tuple<string, int>(fieldMeta.Key, fieldMeta.Value); + } + + _metas[metaEntry.Key].Merge(mergeInfo); + } + } + + /// <summary> + /// Gets descriptor for type. + /// </summary> + /// <param name="type">Type.</param> + /// <returns>Descriptor.</returns> + public IBinaryTypeDescriptor GetDescriptor(Type type) + { + IBinaryTypeDescriptor desc; + + _typeToDesc.TryGetValue(type, out desc); + + return desc; + } + + /// <summary> + /// Gets descriptor for type name. + /// </summary> + /// <param name="typeName">Type name.</param> + /// <returns>Descriptor.</returns> + public IBinaryTypeDescriptor GetDescriptor(string typeName) + { + IBinaryTypeDescriptor desc; + + return _typeNameToDesc.TryGetValue(typeName, out desc) ? desc : + new BinarySurrogateTypeDescriptor(_cfg, typeName); + } + + /// <summary> + /// + /// </summary> + /// <param name="userType"></param> + /// <param name="typeId"></param> + /// <returns></returns> + public IBinaryTypeDescriptor GetDescriptor(bool userType, int typeId) + { + IBinaryTypeDescriptor desc; + + return _idToDesc.TryGetValue(BinaryUtils.TypeKey(userType, typeId), out desc) ? desc : + userType ? new BinarySurrogateTypeDescriptor(_cfg, typeId) : null; + } + + /// <summary> + /// Add user type. + /// </summary> + /// <param name="cfg">Configuration.</param> + /// <param name="typeCfg">Type configuration.</param> + /// <param name="typeResolver">The type resolver.</param> + /// <param name="dfltSerializer">The default serializer.</param> + private void AddUserType(BinaryConfiguration cfg, BinaryTypeConfiguration typeCfg, + TypeResolver typeResolver, IBinarySerializer dfltSerializer) + { + // Get converter/mapper/serializer. + IBinaryNameMapper nameMapper = typeCfg.NameMapper ?? cfg.DefaultNameMapper; + + IBinaryIdMapper idMapper = typeCfg.IdMapper ?? cfg.DefaultIdMapper; + + bool keepDeserialized = typeCfg.KeepDeserialized ?? cfg.DefaultKeepDeserialized; + + // Try resolving type. + Type type = typeResolver.ResolveType(typeCfg.TypeName); + + if (type != null) + { + // Type is found. + var typeName = GetTypeName(type); + + int typeId = BinaryUtils.TypeId(typeName, nameMapper, idMapper); + + var serializer = typeCfg.Serializer ?? cfg.DefaultSerializer + ?? GetBinarizableSerializer(type) ?? dfltSerializer; + + var refSerializer = serializer as BinaryReflectiveSerializer; + + if (refSerializer != null) + refSerializer.Register(type, typeId, nameMapper, idMapper); + + AddType(type, typeId, typeName, true, keepDeserialized, nameMapper, idMapper, serializer, + typeCfg.AffinityKeyFieldName); + } + else + { + // Type is not found. + string typeName = BinaryUtils.SimpleTypeName(typeCfg.TypeName); + + int typeId = BinaryUtils.TypeId(typeName, nameMapper, idMapper); + + AddType(null, typeId, typeName, true, keepDeserialized, nameMapper, idMapper, null, + typeCfg.AffinityKeyFieldName); + } + } + + /// <summary> + /// Gets the <see cref="BinarizableSerializer"/> for a type if it is compatible. + /// </summary> + /// <param name="type">The type.</param> + /// <returns>Resulting <see cref="BinarizableSerializer"/>, or null.</returns> + private static IBinarySerializer GetBinarizableSerializer(Type type) + { + return type.GetInterfaces().Contains(typeof (IBinarizable)) + ? BinarizableSerializer.Instance + : null; + } + + /// <summary> + /// Add type. + /// </summary> + /// <param name="type">Type.</param> + /// <param name="typeId">Type ID.</param> + /// <param name="typeName">Type name.</param> + /// <param name="userType">User type flag.</param> + /// <param name="keepDeserialized">Whether to cache deserialized value in IBinaryObject</param> + /// <param name="nameMapper">Name mapper.</param> + /// <param name="idMapper">ID mapper.</param> + /// <param name="serializer">Serializer.</param> + /// <param name="affKeyFieldName">Affinity key field name.</param> + private void AddType(Type type, int typeId, string typeName, bool userType, + bool keepDeserialized, IBinaryNameMapper nameMapper, IBinaryIdMapper idMapper, + IBinarySerializer serializer, string affKeyFieldName) + { + long typeKey = BinaryUtils.TypeKey(userType, typeId); + + IBinaryTypeDescriptor conflictingType; + + if (_idToDesc.TryGetValue(typeKey, out conflictingType)) + { + var type1 = conflictingType.Type != null + ? conflictingType.Type.AssemblyQualifiedName + : conflictingType.TypeName; + + var type2 = type != null ? type.AssemblyQualifiedName : typeName; + + throw new BinaryObjectException(string.Format("Conflicting type IDs [type1='{0}', " + + "type2='{1}', typeId={2}]", type1, type2, typeId)); + } + + if (userType && _typeNameToDesc.ContainsKey(typeName)) + throw new BinaryObjectException("Conflicting type name: " + typeName); + + IBinaryTypeDescriptor descriptor = + new BinaryFullTypeDescriptor(type, typeId, typeName, userType, nameMapper, idMapper, serializer, + keepDeserialized, affKeyFieldName); + + if (type != null) + _typeToDesc[type] = descriptor; + + if (userType) + _typeNameToDesc[typeName] = descriptor; + + _idToDesc[typeKey] = descriptor; + } + + /// <summary> + /// Adds a predefined system type. + /// </summary> + private void AddSystemType<T>(byte typeId, Func<BinaryReader, T> ctor) where T : IBinaryWriteAware + { + var type = typeof(T); + + var serializer = new BinarySystemTypeSerializer<T>(ctor); + + AddType(type, typeId, GetTypeName(type), false, false, null, null, serializer, null); + } + + /// <summary> + /// Adds predefined system types. + /// </summary> + private void AddSystemTypes() + { + AddSystemType(BinaryUtils.TypeNativeJobHolder, w => new ComputeJobHolder(w)); + AddSystemType(BinaryUtils.TypeComputeJobWrapper, w => new ComputeJobWrapper(w)); + AddSystemType(BinaryUtils.TypeIgniteProxy, w => new IgniteProxy()); + AddSystemType(BinaryUtils.TypeComputeOutFuncJob, w => new ComputeOutFuncJob(w)); + AddSystemType(BinaryUtils.TypeComputeOutFuncWrapper, w => new ComputeOutFuncWrapper(w)); + AddSystemType(BinaryUtils.TypeComputeFuncWrapper, w => new ComputeFuncWrapper(w)); + AddSystemType(BinaryUtils.TypeComputeFuncJob, w => new ComputeFuncJob(w)); + AddSystemType(BinaryUtils.TypeComputeActionJob, w => new ComputeActionJob(w)); + AddSystemType(BinaryUtils.TypeContinuousQueryRemoteFilterHolder, w => new ContinuousQueryFilterHolder(w)); + AddSystemType(BinaryUtils.TypeSerializableHolder, w => new SerializableObjectHolder(w)); + AddSystemType(BinaryUtils.TypeDateTimeHolder, w => new DateTimeHolder(w)); + AddSystemType(BinaryUtils.TypeCacheEntryProcessorHolder, w => new CacheEntryProcessorHolder(w)); + AddSystemType(BinaryUtils.TypeCacheEntryPredicateHolder, w => new CacheEntryFilterHolder(w)); + AddSystemType(BinaryUtils.TypeMessageListenerHolder, w => new MessageListenerHolder(w)); + AddSystemType(BinaryUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w)); + } + + /// <summary> + /// Gets the name of the type. + /// </summary> + /// <param name="type">The type.</param> + /// <returns> + /// Simple type name for non-generic types; simple type name with appended generic arguments for generic types. + /// </returns> + private static string GetTypeName(Type type) + { + if (!type.IsGenericType) + return type.Name; + + var args = type.GetGenericArguments().Select(GetTypeName).Aggregate((x, y) => x + "," + y); + + return string.Format(CultureInfo.InvariantCulture, "{0}[{1}]", type.Name, args); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/894057e5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs new file mode 100644 index 0000000..3e9a28d --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryType.cs @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Binary.Metadata +{ + using System.Collections.Generic; + using System.Diagnostics.CodeAnalysis; + using Apache.Ignite.Core.Binary; + + /// <summary> + /// Binary metadata implementation. + /// </summary> + internal class BinaryType : IBinaryType + { + /** Empty metadata. */ + [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")] + public static readonly BinaryType EmptyMeta = + new BinaryType(BinaryUtils.TypeObject, BinaryTypeNames.TypeNameObject, null, null); + + /** Empty dictionary. */ + private static readonly IDictionary<string, int> EmptyDict = new Dictionary<string, int>(); + + /** Empty list. */ + private static readonly ICollection<string> EmptyList = new List<string>().AsReadOnly(); + + /** Fields. */ + private readonly IDictionary<string, int> _fields; + + /// <summary> + /// Get type name by type ID. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <returns>Type name.</returns> + private static string ConvertTypeName(int typeId) + { + switch (typeId) + { + case BinaryUtils.TypeBool: + return BinaryTypeNames.TypeNameBool; + case BinaryUtils.TypeByte: + return BinaryTypeNames.TypeNameByte; + case BinaryUtils.TypeShort: + return BinaryTypeNames.TypeNameShort; + case BinaryUtils.TypeChar: + return BinaryTypeNames.TypeNameChar; + case BinaryUtils.TypeInt: + return BinaryTypeNames.TypeNameInt; + case BinaryUtils.TypeLong: + return BinaryTypeNames.TypeNameLong; + case BinaryUtils.TypeFloat: + return BinaryTypeNames.TypeNameFloat; + case BinaryUtils.TypeDouble: + return BinaryTypeNames.TypeNameDouble; + case BinaryUtils.TypeDecimal: + return BinaryTypeNames.TypeNameDecimal; + case BinaryUtils.TypeString: + return BinaryTypeNames.TypeNameString; + case BinaryUtils.TypeGuid: + return BinaryTypeNames.TypeNameGuid; + case BinaryUtils.TypeTimestamp: + return BinaryTypeNames.TypeNameTimestamp; + case BinaryUtils.TypeEnum: + return BinaryTypeNames.TypeNameEnum; + case BinaryUtils.TypeBinary: + case BinaryUtils.TypeObject: + return BinaryTypeNames.TypeNameObject; + case BinaryUtils.TypeArrayBool: + return BinaryTypeNames.TypeNameArrayBool; + case BinaryUtils.TypeArrayByte: + return BinaryTypeNames.TypeNameArrayByte; + case BinaryUtils.TypeArrayShort: + return BinaryTypeNames.TypeNameArrayShort; + case BinaryUtils.TypeArrayChar: + return BinaryTypeNames.TypeNameArrayChar; + case BinaryUtils.TypeArrayInt: + return BinaryTypeNames.TypeNameArrayInt; + case BinaryUtils.TypeArrayLong: + return BinaryTypeNames.TypeNameArrayLong; + case BinaryUtils.TypeArrayFloat: + return BinaryTypeNames.TypeNameArrayFloat; + case BinaryUtils.TypeArrayDouble: + return BinaryTypeNames.TypeNameArrayDouble; + case BinaryUtils.TypeArrayDecimal: + return BinaryTypeNames.TypeNameArrayDecimal; + case BinaryUtils.TypeArrayString: + return BinaryTypeNames.TypeNameArrayString; + case BinaryUtils.TypeArrayGuid: + return BinaryTypeNames.TypeNameArrayGuid; + case BinaryUtils.TypeArrayTimestamp: + return BinaryTypeNames.TypeNameArrayTimestamp; + case BinaryUtils.TypeArrayEnum: + return BinaryTypeNames.TypeNameArrayEnum; + case BinaryUtils.TypeArray: + return BinaryTypeNames.TypeNameArrayObject; + case BinaryUtils.TypeCollection: + return BinaryTypeNames.TypeNameCollection; + case BinaryUtils.TypeDictionary: + return BinaryTypeNames.TypeNameMap; + default: + throw new BinaryObjectException("Invalid type ID: " + typeId); + } + } + + /// <summary> + /// Initializes a new instance of the <see cref="BinaryType" /> class. + /// </summary> + /// <param name="reader">The reader.</param> + public BinaryType(IBinaryRawReader reader) + { + TypeId = reader.ReadInt(); + TypeName = reader.ReadString(); + AffinityKeyFieldName = reader.ReadString(); + _fields = reader.ReadDictionaryAsGeneric<string, int>(); + } + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <param name="typeName">Type name.</param> + /// <param name="fields">Fields.</param> + /// <param name="affKeyFieldName">Affinity key field name.</param> + public BinaryType(int typeId, string typeName, IDictionary<string, int> fields, + string affKeyFieldName) + { + TypeId = typeId; + TypeName = typeName; + AffinityKeyFieldName = affKeyFieldName; + _fields = fields; + } + + /// <summary> + /// Type ID. + /// </summary> + /// <returns></returns> + public int TypeId { get; private set; } + + /// <summary> + /// Gets type name. + /// </summary> + public string TypeName { get; private set; } + + /// <summary> + /// Gets field names for that type. + /// </summary> + public ICollection<string> Fields + { + get { return _fields != null ? _fields.Keys : EmptyList; } + } + + /// <summary> + /// Gets field type for the given field name. + /// </summary> + /// <param name="fieldName">Field name.</param> + /// <returns> + /// Field type. + /// </returns> + public string GetFieldTypeName(string fieldName) + { + if (_fields != null) + { + int typeId; + + _fields.TryGetValue(fieldName, out typeId); + + return ConvertTypeName(typeId); + } + + return null; + } + + /// <summary> + /// Gets optional affinity key field name. + /// </summary> + public string AffinityKeyFieldName { get; private set; } + + /// <summary> + /// Gets fields map. + /// </summary> + /// <returns>Fields map.</returns> + public IDictionary<string, int> FieldsMap() + { + return _fields ?? EmptyDict; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/894057e5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHashsetHandler.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHashsetHandler.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHashsetHandler.cs new file mode 100644 index 0000000..af5902f --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHashsetHandler.cs @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Binary.Metadata +{ + using System.Collections.Generic; + + /// <summary> + /// Metadata handler which uses hash set to determine whether field was already written or not. + /// </summary> + internal class BinaryTypeHashsetHandler : IBinaryTypeHandler + { + /** Empty fields collection. */ + private static readonly IDictionary<string, int> EmptyFields = new Dictionary<string, int>(); + + /** IDs known when serialization starts. */ + private readonly ICollection<int> _ids; + + /** New fields. */ + private IDictionary<string, int> _fieldMap; + + /** */ + private readonly bool _newType; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="ids">IDs.</param> + /// <param name="newType">True is metadata for type is not saved.</param> + public BinaryTypeHashsetHandler(ICollection<int> ids, bool newType) + { + _ids = ids; + _newType = newType; + } + + /** <inheritdoc /> */ + public void OnFieldWrite(int fieldId, string fieldName, int typeId) + { + if (!_ids.Contains(fieldId)) + { + if (_fieldMap == null) + _fieldMap = new Dictionary<string, int>(); + + if (!_fieldMap.ContainsKey(fieldName)) + _fieldMap[fieldName] = typeId; + } + } + + /** <inheritdoc /> */ + public IDictionary<string, int> OnObjectWriteFinished() + { + return _fieldMap ?? (_newType ? EmptyFields : null); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/894057e5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHolder.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHolder.cs new file mode 100644 index 0000000..524cda9 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/BinaryTypeHolder.cs @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Binary.Metadata +{ + using System; + using System.Collections.Generic; + using Apache.Ignite.Core.Binary; + + /// <summary> + /// Metadata for particular type. + /// </summary> + internal class BinaryTypeHolder + { + /** Type ID. */ + private readonly int _typeId; + + /** Type name. */ + private readonly string _typeName; + + /** Affinity key field name. */ + private readonly string _affKeyFieldName; + + /** Empty metadata when nothig is know about object fields yet. */ + private readonly IBinaryType _emptyMeta; + + /** Collection of know field IDs. */ + private volatile ICollection<int> _ids; + + /** Last known unmodifiable metadata which is given to the user. */ + private volatile BinaryType _meta; + + /** Saved flag (set if type metadata was saved at least once). */ + private volatile bool _saved; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="typeId">Type ID.</param> + /// <param name="typeName">Type name.</param> + /// <param name="affKeyFieldName">Affinity key field name.</param> + public BinaryTypeHolder(int typeId, string typeName, string affKeyFieldName) + { + _typeId = typeId; + _typeName = typeName; + _affKeyFieldName = affKeyFieldName; + + _emptyMeta = new BinaryType(typeId, typeName, null, affKeyFieldName); + } + + /// <summary> + /// Get saved flag. + /// </summary> + /// <returns>True if type metadata was saved at least once.</returns> + public bool Saved() + { + return _saved; + } + + /// <summary> + /// Get current type metadata. + /// </summary> + /// <value>Type metadata.</value> + public IBinaryType BinaryType + { + get { return _meta ?? _emptyMeta; } + } + + /// <summary> + /// Currently cached field IDs. + /// </summary> + /// <returns>Cached field IDs.</returns> + public ICollection<int> FieldIds() + { + ICollection<int> ids0 = _ids; + + if (_ids == null) + { + lock (this) + { + ids0 = _ids; + + if (ids0 == null) + { + ids0 = new HashSet<int>(); + + _ids = ids0; + } + } + } + + return ids0; + } + + /// <summary> + /// Merge newly sent field metadatas into existing ones. + /// </summary> + /// <param name="newMap">New field metadatas map.</param> + public void Merge(IDictionary<int, Tuple<string, int>> newMap) + { + _saved = true; + + if (newMap == null || newMap.Count == 0) + return; + + lock (this) + { + // 1. Create copies of the old meta. + ICollection<int> ids0 = _ids; + BinaryType meta0 = _meta; + + ICollection<int> newIds = ids0 != null ? new HashSet<int>(ids0) : new HashSet<int>(); + + IDictionary<string, int> newFields = meta0 != null ? + new Dictionary<string, int>(meta0.FieldsMap()) : new Dictionary<string, int>(newMap.Count); + + // 2. Add new fields. + foreach (KeyValuePair<int, Tuple<string, int>> newEntry in newMap) + { + if (!newIds.Contains(newEntry.Key)) + newIds.Add(newEntry.Key); + + if (!newFields.ContainsKey(newEntry.Value.Item1)) + newFields[newEntry.Value.Item1] = newEntry.Value.Item2; + } + + // 3. Assign new meta. Order is important here: meta must be assigned before field IDs. + _meta = new BinaryType(_typeId, _typeName, newFields, _affKeyFieldName); + _ids = newIds; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/894057e5/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/IBinaryTypeHandler.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/IBinaryTypeHandler.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/IBinaryTypeHandler.cs new file mode 100644 index 0000000..848a775 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Metadata/IBinaryTypeHandler.cs @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Binary.Metadata +{ + using System.Collections.Generic; + + /// <summary> + /// Binary type metadata handler. + /// </summary> + public interface IBinaryTypeHandler + { + /// <summary> + /// Callback invoked when named field is written. + /// </summary> + /// <param name="fieldId">Field ID.</param> + /// <param name="fieldName">Field name.</param> + /// <param name="typeId">Field type ID.</param> + void OnFieldWrite(int fieldId, string fieldName, int typeId); + + /// <summary> + /// Callback invoked when object write is finished and it is time to collect missing metadata. + /// </summary> + /// <returns>Collected metadata.</returns> + IDictionary<string, int> OnObjectWriteFinished(); + } +}
