Repository: ignite Updated Branches: refs/heads/ignite-1770 85064003d -> bf93227f0
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf93227f/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp b/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp index a8196a1..644088b 100644 --- a/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/portable/portable_reader_impl.cpp @@ -35,18 +35,19 @@ namespace ignite namespace portable { PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream, PortableIdResolver* idRslvr, - int32_t pos, bool usrType, int32_t typeId, int32_t hashCode, int32_t len, int32_t rawOff) : + int32_t pos, bool usrType, int32_t typeId, int32_t hashCode, int32_t len, int32_t rawOff, + int32_t footerBegin, int32_t footerEnd) : stream(stream), idRslvr(idRslvr), pos(pos), usrType(usrType), typeId(typeId), - hashCode(hashCode), len(len), rawOff(rawOff), rawMode(false), - elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0) + hashCode(hashCode), len(len), rawOff(rawOff), rawMode(false), elemIdGen(0), elemId(0), + elemCnt(-1), elemRead(0), footerBegin(footerBegin), footerEnd(footerEnd) { // No-op. } PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream) : - stream(stream), idRslvr(NULL), pos(0), usrType(false), typeId(0), hashCode(0), - len(0), rawOff(0), rawMode(true), - elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0) + stream(stream), idRslvr(NULL), pos(0), usrType(false), typeId(0), hashCode(0), len(0), + rawOff(0), rawMode(true), elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0), footerBegin(-1), + footerEnd(-1) { // No-op. } @@ -233,12 +234,14 @@ namespace ignite CheckSingleMode(true); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen > 0) - return ReadNullable(stream, PortableUtils::ReadGuid, IGNITE_TYPE_UUID); + if (fieldPos <= 0) + return Guid(); + + stream->Position(fieldPos); - return Guid(); + return ReadNullable(stream, PortableUtils::ReadGuid, IGNITE_TYPE_UUID); } int32_t PortableReaderImpl::ReadGuidArray(const char* fieldName, Guid* res, const int32_t len) @@ -249,20 +252,60 @@ namespace ignite int32_t pos = stream->Position(); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen > 0) { - int32_t realLen = ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, IGNITE_TYPE_ARRAY_UUID); + if (fieldPos <= 0) + return -1; - // If actual read didn't occur return to initial position so that we do not perform - // N jumps to find the field again, where N is total amount of fields. - if (realLen != -1 && (!res || realLen > len)) - stream->Position(pos); + stream->Position(fieldPos); - return realLen; + int32_t realLen = ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, IGNITE_TYPE_ARRAY_UUID); + + return realLen; + } + + void PortableReaderImpl::ParseHeaderIfNeeded() + { + if (footerBegin) + return; + + InteropStreamPositionGuard<InteropInputStream> posGuard(*stream); + + int8_t hdr = stream->ReadInt8(); + + if (hdr != IGNITE_HDR_FULL) + IGNITE_ERROR_2(ignite::IgniteError::IGNITE_ERR_PORTABLE, "Invalid header: ", hdr); + + int8_t protoVer = stream->ReadInt8(); + + if (protoVer != IGNITE_PROTO_VER) { + IGNITE_ERROR_2(ignite::IgniteError::IGNITE_ERR_PORTABLE, + "Unsupported portable protocol version: ", protoVer); } - return -1; + int16_t flags = stream->ReadInt16(); + int32_t typeId = stream->ReadInt32(); + int32_t hashCode = stream->ReadInt32(); + int32_t len = stream->ReadInt32(); + int32_t schemaId = stream->ReadInt32(); + int32_t schemaOrRawOff = stream->ReadInt32(); + + if (flags & IGNITE_PORTABLE_FLAG_RAW_ONLY) + { + footerBegin = len; + + rawOff = schemaOrRawOff; + } + else + { + footerBegin = schemaOrRawOff; + + rawOff = (len - footerBegin) % 8 ? stream->ReadInt32(pos + len - 4) : schemaOrRawOff; + } + + footerEnd = len - ((len - footerBegin) % 8); + + bool usrType = flags & IGNITE_PORTABLE_FLAG_USER_OBJECT; } void PortableReaderImpl::ReadGuidArrayInternal(InteropInputStream* stream, Guid* res, const int32_t len) @@ -287,20 +330,16 @@ namespace ignite int32_t pos = stream->Position(); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen > 0) { - int32_t realLen = ReadStringInternal(res, len); + if (fieldPos <= 0) + return -1; - // If actual read didn't occur return to initial position so that we do not perform - // N jumps to find the field again, where N is total amount of fields. - if (realLen != -1 && (!res || realLen > len)) - stream->Position(pos); + stream->Position(fieldPos); - return realLen; - } + int32_t realLen = ReadStringInternal(res, len); - return -1; + return realLen; } int32_t PortableReaderImpl::ReadStringArray(int32_t* size) @@ -314,15 +353,18 @@ namespace ignite CheckSingleMode(true); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen > 0) - return StartContainerSession(false, IGNITE_TYPE_ARRAY_STRING, size); - else { + if (fieldPos <= 0) + { *size = -1; return ++elemIdGen; } + + stream->Position(fieldPos); + + return StartContainerSession(false, IGNITE_TYPE_ARRAY_STRING, size); } int32_t PortableReaderImpl::ReadStringElement(int32_t id, char* res, const int32_t len) @@ -389,15 +431,18 @@ namespace ignite CheckSingleMode(true); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen > 0) - return StartContainerSession(false, IGNITE_TYPE_ARRAY, size); - else { + if (fieldPos <= 0) + { *size = -1; return ++elemIdGen; } + + stream->Position(fieldPos); + + return StartContainerSession(false, IGNITE_TYPE_ARRAY, size); } int32_t PortableReaderImpl::ReadCollection(CollectionType* typ, int32_t* size) @@ -418,25 +463,26 @@ namespace ignite CheckSingleMode(true); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen > 0) + if (fieldPos <= 0) { - int32_t id = StartContainerSession(false, IGNITE_TYPE_COLLECTION, size); - - if (*size == -1) - *typ = IGNITE_COLLECTION_UNDEFINED; - else - *typ = static_cast<CollectionType>(stream->ReadInt8()); - - return id; - } - else { *typ = IGNITE_COLLECTION_UNDEFINED; *size = -1; return ++elemIdGen; } + + stream->Position(fieldPos); + + int32_t id = StartContainerSession(false, IGNITE_TYPE_COLLECTION, size); + + if (*size == -1) + *typ = IGNITE_COLLECTION_UNDEFINED; + else + *typ = static_cast<CollectionType>(stream->ReadInt8()); + + return id; } int32_t PortableReaderImpl::ReadMap(MapType* typ, int32_t* size) @@ -457,25 +503,26 @@ namespace ignite CheckSingleMode(true); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen > 0) + if (fieldPos <= 0) { - int32_t id = StartContainerSession(false, IGNITE_TYPE_MAP, size); - - if (*size == -1) - *typ = IGNITE_MAP_UNDEFINED; - else - *typ = static_cast<MapType>(stream->ReadInt8()); - - return id; - } - else { *typ = IGNITE_MAP_UNDEFINED; *size = -1; return ++elemIdGen; } + + stream->Position(fieldPos); + + int32_t id = StartContainerSession(false, IGNITE_TYPE_MAP, size); + + if (*size == -1) + *typ = IGNITE_MAP_UNDEFINED; + else + *typ = static_cast<MapType>(stream->ReadInt8()); + + return id; } CollectionType PortableReaderImpl::ReadCollectionTypeUnprotected() @@ -504,11 +551,13 @@ namespace ignite InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen <= 0) + if (fieldPos <= 0) return IGNITE_COLLECTION_UNDEFINED; + stream->Position(fieldPos); + return ReadCollectionTypeUnprotected(); } @@ -544,11 +593,13 @@ namespace ignite InteropStreamPositionGuard<InteropInputStream> positionGuard(*stream); int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - int32_t fieldLen = SeekField(fieldId); + int32_t fieldPos = FindField(fieldId); - if (fieldLen <= 0) + if (fieldPos <= 0) return -1; + stream->Position(fieldPos); + return ReadCollectionSizeUnprotected(); } @@ -635,41 +686,18 @@ namespace ignite return stream; } - int32_t PortableReaderImpl::SeekField(const int32_t fieldId) + int32_t PortableReaderImpl::FindField(const int32_t fieldId) { - // We assume that it is very likely that fields are read in the same - // order as they were initially written. So we start seeking field - // from current stream position making a "loop" up to this position. - int32_t marker = stream->Position(); - - for (int32_t curPos = marker; curPos < pos + rawOff;) - { - int32_t curFieldId = stream->ReadInt32(); - int32_t curFieldLen = stream->ReadInt32(); + InteropStreamPositionGuard<InteropInputStream> streamGuard(*stream); - if (fieldId == curFieldId) - return curFieldLen; - else { - curPos = stream->Position() + curFieldLen; + stream->Position(footerBegin); - stream->Position(curPos); - } - } - - stream->Position(pos + IGNITE_FULL_HDR_LEN); - - for (int32_t curPos = stream->Position(); curPos < marker;) + for (int32_t schemaPos = footerBegin; schemaPos < footerEnd; schemaPos += 8) { - int32_t curFieldId = stream->ReadInt32(); - int32_t curFieldLen = stream->ReadInt32(); + int32_t currentFieldId = stream->ReadInt32(schemaPos); - if (fieldId == curFieldId) - return curFieldLen; - else { - curPos = stream->Position() + curFieldLen; - - stream->Position(curPos); - } + if (fieldId == currentFieldId) + return stream->ReadInt32(schemaPos + 4) + pos; } return -1; http://git-wip-us.apache.org/repos/asf/ignite/blob/bf93227f/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp b/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp new file mode 100644 index 0000000..57c23a7 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/portable/portable_schema.cpp @@ -0,0 +1,88 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "ignite/impl/portable/portable_schema.h" +#include "ignite/impl/portable/portable_writer_impl.h" + +/** FNV1 hash offset basis. */ +enum { FNV1_OFFSET_BASIS = 0x811C9DC5 }; + +/** FNV1 hash prime. */ +enum { FNV1_PRIME = 0x01000193 }; + +namespace ignite +{ + namespace impl + { + namespace portable + { + PortableSchema::PortableSchema(): id(0), fieldsInfo(new FieldContainer()) + { + // No-op. + } + + PortableSchema::~PortableSchema() + { + delete fieldsInfo; + } + + void PortableSchema::AddField(int32_t fieldId, int32_t offset) + { + if (!id) + { + // Initialize offset when the first field is written. + id = FNV1_OFFSET_BASIS; + } + + // Advance schema hash. + int32_t idAccumulator = id ^ (fieldId & 0xFF); + idAccumulator *= FNV1_PRIME; + idAccumulator ^= (fieldId >> 8) & 0xFF; + idAccumulator *= FNV1_PRIME; + idAccumulator ^= (fieldId >> 16) & 0xFF; + idAccumulator *= FNV1_PRIME; + idAccumulator ^= (fieldId >> 24) & 0xFF; + idAccumulator *= FNV1_PRIME; + + id = idAccumulator; + + PortableSchemaFieldInfo info = { fieldId, offset }; + fieldsInfo->push_back(info); + } + + void PortableSchema::Write(interop::InteropOutputStream& out) const + { + for (FieldContainer::const_iterator i = fieldsInfo->begin(); i != fieldsInfo->end(); ++i) + { + out.WriteInt32(i->id); + out.WriteInt32(i->offset); + } + } + + bool PortableSchema::Empty() const + { + return fieldsInfo->empty(); + } + + void PortableSchema::Clear() + { + id = 0; + fieldsInfo->clear(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/bf93227f/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp b/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp index b16a934..4b65b4b 100644 --- a/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/portable/portable_writer_impl.cpp @@ -16,6 +16,7 @@ */ #include "ignite/impl/portable/portable_writer_impl.h" +#include "ignite/impl/interop/interop_stream_position_guard.h" #include "ignite/ignite_error.h" using namespace ignite::impl::interop; @@ -29,16 +30,16 @@ namespace ignite namespace portable { PortableWriterImpl::PortableWriterImpl(InteropOutputStream* stream, PortableIdResolver* idRslvr, - PortableMetadataManager* metaMgr, PortableMetadataHandler* metaHnd) : + PortableMetadataManager* metaMgr, PortableMetadataHandler* metaHnd, int32_t start) : stream(stream), idRslvr(idRslvr), metaMgr(metaMgr), metaHnd(metaHnd), typeId(idRslvr->GetTypeId()), - elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(-1) + elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(-1), start(start) { // No-op. } PortableWriterImpl::PortableWriterImpl(InteropOutputStream* stream, PortableMetadataManager* metaMgr) : stream(stream), idRslvr(NULL), metaMgr(metaMgr), metaHnd(NULL), typeId(0), - elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(0) + elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(0), start(stream->Position()) { // No-op. } @@ -240,7 +241,7 @@ namespace ignite CheckRawMode(false); CheckSingleMode(true); - WriteFieldIdAndLength(fieldName, IGNITE_TYPE_UUID, 1 + 16); + WriteFieldId(fieldName, IGNITE_TYPE_UUID); stream->WriteInt8(IGNITE_TYPE_UUID); @@ -256,7 +257,6 @@ namespace ignite if (val) { - stream->WriteInt32(5 + len * 17); stream->WriteInt8(IGNITE_TYPE_ARRAY_UUID); stream->WriteInt32(len); @@ -269,7 +269,6 @@ namespace ignite } else { - stream->WriteInt32(1); stream->WriteInt8(IGNITE_HDR_NULL); } } @@ -298,21 +297,15 @@ namespace ignite if (val) { - int32_t lenPos = stream->Position(); - stream->Position(lenPos + 4); - stream->WriteInt8(IGNITE_TYPE_STRING); stream->WriteBool(false); stream->WriteInt32(len); for (int i = 0; i < len; i++) stream->WriteUInt16(*(val + i)); - - stream->WriteInt32(lenPos, stream->Position() - lenPos - 4); } else { - stream->WriteInt32(1); stream->WriteInt8(IGNITE_HDR_NULL); } } @@ -331,7 +324,7 @@ namespace ignite { StartContainerSession(false); - WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_ARRAY_STRING); + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_STRING); stream->WriteInt8(IGNITE_TYPE_ARRAY_STRING); stream->Position(stream->Position() + 4); @@ -368,7 +361,7 @@ namespace ignite CheckRawMode(false); CheckSingleMode(true); - WriteFieldIdAndLength(fieldName, IGNITE_TYPE_OBJECT, 1); + WriteFieldId(fieldName, IGNITE_TYPE_OBJECT); stream->WriteInt8(IGNITE_HDR_NULL); } @@ -386,7 +379,7 @@ namespace ignite { StartContainerSession(false); - WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_ARRAY); + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY); stream->WriteInt8(IGNITE_TYPE_ARRAY); stream->Position(stream->Position() + 4); @@ -409,7 +402,7 @@ namespace ignite { StartContainerSession(false); - WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_COLLECTION); + WriteFieldId(fieldName, IGNITE_TYPE_COLLECTION); stream->WriteInt8(IGNITE_TYPE_COLLECTION); stream->Position(stream->Position() + 4); @@ -433,7 +426,7 @@ namespace ignite { StartContainerSession(false); - WriteFieldIdSkipLength(fieldName, IGNITE_TYPE_MAP); + WriteFieldId(fieldName, IGNITE_TYPE_MAP); stream->WriteInt8(IGNITE_TYPE_MAP); stream->Position(stream->Position() + 4); @@ -446,15 +439,7 @@ namespace ignite { CheckSession(id); - if (rawPos == -1) - { - int32_t len = stream->Position() - elemPos - 4; - - stream->WriteInt32(elemPos + 4, len); - stream->WriteInt32(elemPos + 9, elemCnt); - } - else - stream->WriteInt32(elemPos + 1, elemCnt); + stream->WriteInt32(elemPos + 1, elemCnt); elemId = 0; elemCnt = 0; @@ -516,27 +501,14 @@ namespace ignite void PortableWriterImpl::WriteFieldId(const char* fieldName, int32_t fieldTypeId) { int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); - - stream->WriteInt32(fieldId); + int32_t fieldOff = stream->Position() - start; + + schema.AddField(fieldId, fieldOff); if (metaHnd) metaHnd->OnFieldWritten(fieldId, fieldName, fieldTypeId); } - void PortableWriterImpl::WriteFieldIdSkipLength(const char* fieldName, int32_t fieldTypeId) - { - WriteFieldId(fieldName, fieldTypeId); - - stream->Position(stream->Position() + 4); - } - - void PortableWriterImpl::WriteFieldIdAndLength(const char* fieldName, int32_t fieldTypeId, int32_t len) - { - WriteFieldId(fieldName, fieldTypeId); - - stream->WriteInt32(len); - } - template <> void PortableWriterImpl::WriteTopObject<int8_t>(const int8_t& obj) { @@ -591,6 +563,50 @@ namespace ignite WriteTopObject0<Guid>(obj, PortableUtils::WriteGuid, IGNITE_TYPE_UUID); } + void PortableWriterImpl::PostWrite() + { + int32_t lenWithoutSchema = stream->Position() - start; + + if (schema.Empty()) + { + InteropStreamPositionGuard<InteropOutputStream> guard(*stream); + + stream->Position(start + IGNITE_OFFSET_FLAGS); + stream->WriteInt16(IGNITE_PORTABLE_FLAG_USER_OBJECT | IGNITE_PORTABLE_FLAG_RAW_ONLY); + + stream->WriteInt32(start + IGNITE_OFFSET_LEN, lenWithoutSchema); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, 0); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, GetRawPosition() - start); + } + else + { + int32_t schemaId = schema.GetId(); + + WriteAndClearSchema(); + + if (rawPos > 0) + stream->WriteInt32(rawPos - start); + + int32_t length = stream->Position() - start; + + stream->WriteInt32(start + IGNITE_OFFSET_LEN, length); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, schemaId); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, lenWithoutSchema); + } + } + + bool PortableWriterImpl::HasSchema() const + { + return !schema.Empty(); + } + + void PortableWriterImpl::WriteAndClearSchema() + { + schema.Write(*stream); + + schema.Clear(); + } + InteropOutputStream* PortableWriterImpl::GetStream() { return stream;
