Repository: ignite Updated Branches: refs/heads/ignite-1.5 07f5a62ec -> 5dce6d982
IGNITE-1961: Binary format: optimized sequential reads. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5dce6d98 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5dce6d98 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5dce6d98 Branch: refs/heads/ignite-1.5 Commit: 5dce6d982dd756555bdc5eb868bd8a3fa1359aeb Parents: 07f5a62 Author: vozerov-gridgain <[email protected]> Authored: Mon Nov 23 11:06:23 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Mon Nov 23 11:06:23 2015 +0300 ---------------------------------------------------------------------- .../internal/portable/BinaryReaderExImpl.java | 211 +++++++++++-------- 1 file changed, 122 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5dce6d98/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java index 5d31670..2534fd7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java @@ -129,15 +129,15 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje /** */ private final int start; + /** Start of actual data. Positioned right after the header. */ + private final int dataStart; + /** Type ID. */ private final int typeId; /** Raw offset. */ private final int rawOff; - /** */ - private final int hdrLen; - /** Footer start. */ private final int footerStart; @@ -241,12 +241,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje int clsNameLen = in.position() - off; - hdrLen = DFLT_HDR_LEN + clsNameLen; + dataStart = start + DFLT_HDR_LEN + clsNameLen; } else { typeId = typeId0; - hdrLen = DFLT_HDR_LEN; + dataStart = start + DFLT_HDR_LEN; } idMapper = userType ? ctx.userTypeIdMapper(typeId) : null; @@ -255,7 +255,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje else { typeId = 0; rawOff = 0; - hdrLen = 0; + dataStart = 0; footerStart = 0; footerLen = 0; idMapper = null; @@ -266,7 +266,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje schema = null; } - in.position(start); + streamPosition(start); } /** @@ -297,10 +297,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje * @throws BinaryObjectException In case of error. */ public Object unmarshal(int offset) throws BinaryObjectException { - // Random reads prevent any further speculations. - matching = false; - - in.position(offset); + streamPosition(offset); return in.position() >= 0 ? unmarshal() : null; } @@ -395,18 +392,18 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje * @return Field. */ private <T> T readHandleField() { - int handle = (in.position() - 1) - in.readInt(); + int handlePos = positionForHandle() - in.readInt(); - int retPos = in.position(); - - Object obj = rCtx.get(handle); + Object obj = rCtx.get(handlePos); if (obj == null) { - in.position(handle); + int retPos = in.position(); + + streamPosition(handlePos); obj = doReadObject(); - in.position(retPos); + streamPosition(retPos); } return (T)obj; @@ -1387,7 +1384,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje /** {@inheritDoc} */ @Override public BinaryRawReader rawReader() { - in.position(rawOff); + streamPositionRandom(rawOff); return this; } @@ -1405,43 +1402,51 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje case NULL: return null; - case HANDLE: - int handle = start - in.readInt(); + case HANDLE: { + int handlePos = start - in.readInt(); + + Object obj = rCtx.get(handlePos); - BinaryObject handledPo = rCtx.get(handle); + if (obj == null) { + int retPos = in.position(); - if (handledPo != null) - return handledPo; + streamPosition(handlePos); - in.position(handle); + obj = unmarshal(); - return unmarshal(); + streamPosition(retPos); + } - case OBJ: + return obj; + } + + case OBJ: { PortableUtils.checkProtocolVersion(in.readByte()); + int len = PortableUtils.length(in, start); + BinaryObjectEx po; if (detach) { - in.position(start + GridPortableMarshaller.TOTAL_LEN_POS); - - int len = in.readInt(); - - in.position(start); + // In detach mode we simply copy object's content. + streamPosition(start); po = new BinaryObjectImpl(ctx, in.readByteArray(len), 0); } - else - po = in.offheapPointer() > 0 - ? new BinaryObjectOffheapImpl(ctx, in.offheapPointer(), start, - in.remaining() + in.position()) - : new BinaryObjectImpl(ctx, in.array(), start); + else { + if (in.offheapPointer() == 0) + po = new BinaryObjectImpl(ctx, in.array(), start); + else + po = new BinaryObjectOffheapImpl(ctx, in.offheapPointer(), start, + in.remaining() + in.position()); - rCtx.put(start, po); + streamPosition(start + po.length()); + } - in.position(start + po.length()); + rCtx.put(start, po); return po; + } case BYTE: return in.readByte(); @@ -1546,20 +1551,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje return doReadClass(); case OPTM_MARSH: - int len = in.readInt(); - - ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), len); - - Object obj; - - try { - obj = ctx.optimizedMarsh().unmarshal(input, null); - } - catch (IgniteCheckedException e) { - throw new BinaryObjectException("Failed to unmarshal object with optmMarsh marshaller", e); - } - - return obj; + return doReadOptimized(); default: throw new BinaryObjectException("Invalid flag value: " + flag); @@ -1592,11 +1584,13 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje return new String(doReadByteArray(), UTF_8); int strLen = in.readInt(); - int strOff = in.position(); - String res = new String(in.array(), strOff, strLen, UTF_8); + int pos = in.position(); - in.position(strOff + strLen); + // String will copy necessary array part for us. + String res = new String(in.array(), pos, strLen, UTF_8); + + streamPosition(pos + strLen); return res; } @@ -1657,18 +1651,18 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje break; case HANDLE: - int handle = start - in.readInt(); + int handlePos = start - in.readInt(); - obj = rCtx.get(handle); + obj = rCtx.get(handlePos); if (obj == null) { int retPos = in.position(); - in.position(handle); + streamPosition(handlePos); obj = doReadObject(); - in.position(retPos); + streamPosition(retPos); } break; @@ -1676,14 +1670,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje case OBJ: PortableClassDescriptor desc = ctx.descriptorForTypeId(userType, typeId, ldr); - in.position(start + hdrLen); + streamPosition(dataStart); if (desc == null) throw new BinaryInvalidTypeException("Unknown type ID: " + typeId); obj = desc.read(this); - in.position(footerStart + footerLen); + streamPosition(footerStart + footerLen); break; @@ -1863,18 +1857,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje break; case OPTM_MARSH: - int dataLen = in.readInt(); - - ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), dataLen); - - try { - obj = ctx.optimizedMarsh().unmarshal(input, null); - } - catch (IgniteCheckedException e) { - throw new BinaryObjectException("Failed to unmarshal object with optimized marshaller", e); - } - - in.position(in.position() + dataLen); + obj = doReadOptimized(); break; @@ -1886,6 +1869,27 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje } /** + * Read object serialized using optimized marshaller. + * + * @return Result. + */ + private Object doReadOptimized() { + int len = in.readInt(); + + ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), len); + + try { + return ctx.optimizedMarsh().unmarshal(input, null); + } + catch (IgniteCheckedException e) { + throw new BinaryObjectException("Failed to unmarshal object with optimized marshaller", e); + } + finally { + streamPosition(in.position() + len); + } + } + + /** * @return Value. */ private byte[] doReadByteArray() { @@ -2297,7 +2301,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje int pos = in.position(); - in.position(in.position() + len); + streamPosition(in.position() + len); int start = in.readInt(); @@ -2479,7 +2483,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje * @return Offset. */ public boolean findFieldByName(String name) { - assert hdrLen != 0; + assert dataStart != start; if (footerLen == 0) return false; @@ -2495,9 +2499,11 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje switch (confirm) { case CONFIRMED: // The best case: got order without ID calculation and (ID -> order) lookup. - order = expOrder; + if (expOrder == 0) + // When we read the very first field, position is set to start, hence this re-positioning. + streamPosition(dataStart); - break; + return true; case REJECTED: // Rejected, no more speculations are possible. Fallback to the slowest scenario. @@ -2518,7 +2524,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje // IDs matched, cache field name inside schema. schema.clarifyFieldName(expOrder, name); - order = expOrder; + if (expOrder == 0) + streamPosition(dataStart); + + return true; } else { // No match, stop further speculations. @@ -2544,10 +2553,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje * (string -> ID) calculations. * * @param id Field ID. - * @return Offset. + * @return {@code True} if field was found and stream was positioned accordingly. */ private boolean findFieldById(int id) { - assert hdrLen != 0; + assert dataStart != start; if (footerLen == 0) return false; @@ -2561,8 +2570,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje int realId = schema.fieldId(expOrder); - if (realId == id) - order = expOrder; + if (realId == id) { + if (expOrder == 0) + streamPosition(dataStart); + + return true; + } else { // Mismatch detected, no need for further speculations. matching = false; @@ -2580,10 +2593,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje } /** - * Set position for the given user field order and return it. + * Set position for the given user field order. * * @param order Order. - * @return Position. + * @return {@code True} if field was found and stream was positioned accordingly. */ private boolean trySetUserFieldPosition(int order) { if (order != PortableSchema.ORDER_NOT_FOUND) { @@ -2591,7 +2604,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje int pos = start + PortableUtils.fieldOffsetRelative(in, offsetPos, fieldOffsetLen); - in.position(pos); + streamPosition(pos); return true; } @@ -2600,10 +2613,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje } /** - * Set position for the given system field ID and return it. + * Set position for the given system field ID. * * @param id Field ID. - * @return Position. + * @return {@code True} if field was found and stream was positioned accordingly. */ private boolean trySetSystemFieldPosition(int id) { // System types are never written with compact footers because they do not have metadata. @@ -2622,7 +2635,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje int pos = start + PortableUtils.fieldOffsetRelative(in, searchPos + PortableUtils.FIELD_ID_LEN, fieldOffsetLen); - in.position(pos); + streamPosition(pos); return true; } @@ -2631,6 +2644,26 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje } } + /** + * Set stream position. + * + * @param pos Position. + */ + private void streamPosition(int pos) { + in.position(pos); + } + + /** + * Set stream position as a part of some random read. Further speculations will be disabled after this call. + * + * @param pos Position. + */ + private void streamPositionRandom(int pos) { + streamPosition(pos); + + matching = false; + } + /** {@inheritDoc} */ @Override public int readUnsignedByte() throws IOException { return readByte() & 0xff; @@ -2697,7 +2730,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje @Override public int skipBytes(int n) throws IOException { int toSkip = Math.min(in.remaining(), n); - in.position(in.position() + toSkip); + streamPositionRandom(in.position() + toSkip); return toSkip; }
