http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/binary/binary_type_handler.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_type_handler.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_type_handler.cpp new file mode 100644 index 0000000..5e70707 --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_type_handler.cpp @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_type_handler.h" + +using namespace ignite::common::concurrent; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeHandler::BinaryTypeHandler(SPSnap snap) : snap(snap), fieldIds(NULL), fields(NULL) + { + // No-op. + } + + BinaryTypeHandler::~BinaryTypeHandler() + { + if (fieldIds) + delete fieldIds; + + if (fields) + delete fields; + } + + void BinaryTypeHandler::OnFieldWritten(int32_t fieldId, std::string fieldName, int32_t fieldTypeId) + { + if (!snap.Get() || !snap.Get()->ContainsFieldId(fieldId)) + { + if (!HasDifference()) + { + fieldIds = new std::set<int32_t>(); + fields = new std::map<std::string, int32_t>(); + } + + fieldIds->insert(fieldId); + (*fields)[fieldName] = fieldTypeId; + } + } + + SPSnap BinaryTypeHandler::GetSnapshot() + { + return snap; + } + + bool BinaryTypeHandler::HasDifference() + { + return fieldIds ? true : false; + } + + std::set<int32_t>* BinaryTypeHandler::GetFieldIds() + { + return fieldIds; + } + + std::map<std::string, int32_t>* BinaryTypeHandler::GetFields() + { + return fields; + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/binary/binary_type_manager.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_type_manager.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_type_manager.cpp new file mode 100644 index 0000000..9bd115c --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_type_manager.cpp @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/common/concurrent.h> + +#include "ignite/impl/binary/binary_type_manager.h" + +using namespace ignite::common::concurrent; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeManager::BinaryTypeManager() : + snapshots(SharedPointer<std::map<int32_t, SPSnap>>(new std::map<int32_t, SPSnap>)), + pending(new std::vector<SPSnap>()), + cs(new CriticalSection()), + pendingVer(0), ver(0) + { + // No-op. + } + + BinaryTypeManager::~BinaryTypeManager() + { + pending->erase(pending->begin(), pending->end()); + + delete pending; + delete cs; + } + + SharedPointer<BinaryTypeHandler> BinaryTypeManager::GetHandler(int32_t typeId) + { + SharedPointer<std::map<int32_t, SPSnap>> snapshots0 = snapshots; + + SPSnap snapshot = (*snapshots0.Get())[typeId]; + + return SharedPointer<BinaryTypeHandler>(new BinaryTypeHandler(snapshot)); + } + + void BinaryTypeManager::SubmitHandler(std::string typeName, int32_t typeId, + BinaryTypeHandler* hnd) + { + Snap* snap = hnd->GetSnapshot().Get(); + + // If this is the very first write of a class or difference exists, + // we need to enqueue it for write. + if (!snap || hnd->HasDifference()) + { + std::set<int32_t>* newFieldIds = new std::set<int32_t>(); + std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>(); + + CopyFields(snap, newFieldIds, newFields); + + if (hnd->HasDifference()) + { + std::set<int32_t>* diffFieldIds = hnd->GetFieldIds(); + std::map<std::string, int32_t>* diffFields = hnd->GetFields(); + + for (std::set<int32_t>::iterator it = diffFieldIds->begin(); it != diffFieldIds->end(); ++it) + newFieldIds->insert(*it); + + for (std::map<std::string, int32_t>::iterator it = diffFields->begin(); it != diffFields->end(); ++it) + (*newFields)[it->first] = it->second; + } + + Snap* diffSnap = new Snap(typeName, typeId, newFieldIds, newFields); + + cs->Enter(); + + pending->push_back(SPSnap(diffSnap)); + + pendingVer++; + + cs->Leave(); + } + } + + int32_t BinaryTypeManager::GetVersion() + { + Memory::Fence(); + + return ver; + } + + bool BinaryTypeManager::IsUpdatedSince(int32_t oldVer) + { + Memory::Fence(); + + return pendingVer > oldVer; + } + + bool BinaryTypeManager::ProcessPendingUpdates(BinaryTypeUpdater* updater, IgniteError* err) + { + bool success = true; // Optimistically assume that all will be fine. + + cs->Enter(); + + for (std::vector<SPSnap>::iterator it = pending->begin(); it != pending->end(); ++it) + { + Snap* pendingSnap = (*it).Get(); + + if (updater->Update(pendingSnap, err)) + { + // Perform copy-on-write update of snapshot collection. + std::map<int32_t, SPSnap>* newSnapshots = new std::map<int32_t, SPSnap>(); + + bool snapshotFound = false; + + for (std::map<int32_t, SPSnap>::iterator snapIt = snapshots.Get()->begin(); + snapIt != snapshots.Get()->end(); ++snapIt) + { + int32_t curTypeId = snapIt->first; + Snap* curSnap = snapIt->second.Get(); + + if (pendingSnap->GetTypeId() == curTypeId) + { + // Have to create snapshot with updated fields. + std::set<int32_t>* newFieldIds = new std::set<int32_t>(); + std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>(); + + // Add old fields. + CopyFields(curSnap, newFieldIds, newFields); + + // Add new fields. + CopyFields(pendingSnap, newFieldIds, newFields); + + // Create new snapshot. + Snap* newSnap = new Snap(pendingSnap->GetTypeName(), pendingSnap->GetTypeId(), + newFieldIds, newFields); + + (*newSnapshots)[curTypeId] = SPSnap(newSnap); + + snapshotFound = true; + } + else + (*newSnapshots)[curTypeId] = snapIt->second; // Just transfer exising snapshot. + } + + // Handle situation when completely new snapshot is found. + if (!snapshotFound) + (*newSnapshots)[pendingSnap->GetTypeId()] = *it; + + snapshots = SharedPointer<std::map<int32_t, SPSnap>>(newSnapshots); + } + else + { + // Stop as we cannot move further. + success = false; + + break; + } + } + + if (success) + { + pending->erase(pending->begin(), pending->end()); + + ver = pendingVer; + } + + cs->Leave(); + + return success; + } + + void BinaryTypeManager::CopyFields(Snap* snap, std::set<int32_t>* fieldIds, + std::map<std::string, int32_t>* fields) + { + if (snap && snap->HasFields()) + { + std::set<int32_t>* snapFieldIds = snap->GetFieldIds(); + std::map<std::string, int32_t>* snapFields = snap->GetFields(); + + for (std::set<int32_t>::iterator oldIt = snapFieldIds->begin(); + oldIt != snapFieldIds->end(); ++oldIt) + fieldIds->insert(*oldIt); + + for (std::map<std::string, int32_t>::iterator newFieldsIt = snapFields->begin(); + newFieldsIt != snapFields->end(); ++newFieldsIt) + (*fields)[newFieldsIt->first] = newFieldsIt->second; + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/binary/binary_type_snapshot.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_type_snapshot.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_type_snapshot.cpp new file mode 100644 index 0000000..f34732f --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_type_snapshot.cpp @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_type_snapshot.h" + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeSnapshot::BinaryTypeSnapshot(std::string typeName, int32_t typeId, + std::set<int32_t>* fieldIds, std::map<std::string, int32_t>* fields) : + typeName(typeName), typeId(typeId), fieldIds(fieldIds), fields(fields) + { + // No-op. + } + + BinaryTypeSnapshot::~BinaryTypeSnapshot() + { + delete fieldIds; + delete fields; + } + + bool BinaryTypeSnapshot::ContainsFieldId(int32_t fieldId) + { + return fieldIds && fieldIds->count(fieldId) == 1; + } + + std::string BinaryTypeSnapshot::GetTypeName() + { + return typeName; + } + + int32_t BinaryTypeSnapshot::GetTypeId() + { + return typeId; + } + + bool BinaryTypeSnapshot::HasFields() + { + return !fieldIds->empty(); + } + + std::set<int32_t>* BinaryTypeSnapshot::GetFieldIds() + { + return fieldIds; + } + + std::map<std::string, int32_t>* BinaryTypeSnapshot::GetFields() + { + return fields; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/binary/binary_type_updater.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_type_updater.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_type_updater.cpp new file mode 100644 index 0000000..b3436e9 --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_type_updater.cpp @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/binary/binary_type_updater.h" + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryTypeUpdater::~BinaryTypeUpdater() + { + // No-op. + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp new file mode 100644 index 0000000..1a1946c --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_utils.cpp @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <time.h> + +#include "ignite/impl/interop/interop.h" +#include "ignite/impl/binary/binary_utils.h" + +using namespace ignite::impl::interop; +using namespace ignite::impl::binary; + +namespace ignite +{ + namespace impl + { + namespace binary + { + int8_t BinaryUtils::ReadInt8(InteropInputStream* stream) + { + return stream->ReadInt8(); + } + + void BinaryUtils::WriteInt8(InteropOutputStream* stream, int8_t val) + { + stream->WriteInt8(val); + } + + void BinaryUtils::ReadInt8Array(InteropInputStream* stream, int8_t* res, const int32_t len) + { + stream->ReadInt8Array(res, len); + } + + void BinaryUtils::WriteInt8Array(InteropOutputStream* stream, const int8_t* val, const int32_t len) + { + stream->WriteInt8Array(val, len); + } + + bool BinaryUtils::ReadBool(InteropInputStream* stream) + { + return stream->ReadBool(); + } + + void BinaryUtils::WriteBool(InteropOutputStream* stream, bool val) + { + stream->WriteBool(val); + } + + void BinaryUtils::ReadBoolArray(InteropInputStream* stream, bool* res, const int32_t len) + { + stream->ReadBoolArray(res, len); + } + + void BinaryUtils::WriteBoolArray(InteropOutputStream* stream, const bool* val, const int32_t len) + { + stream->WriteBoolArray(val, len); + } + + int16_t BinaryUtils::ReadInt16(InteropInputStream* stream) + { + return stream->ReadInt16(); + } + + void BinaryUtils::WriteInt16(InteropOutputStream* stream, int16_t val) + { + stream->WriteInt16(val); + } + + void BinaryUtils::ReadInt16Array(InteropInputStream* stream, int16_t* res, const int32_t len) + { + stream->ReadInt16Array(res, len); + } + + void BinaryUtils::WriteInt16Array(InteropOutputStream* stream, const int16_t* val, const int32_t len) + { + stream->WriteInt16Array(val, len); + } + + uint16_t BinaryUtils::ReadUInt16(InteropInputStream* stream) + { + return stream->ReadUInt16(); + } + + void BinaryUtils::WriteUInt16(InteropOutputStream* stream, uint16_t val) + { + stream->WriteUInt16(val); + } + + void BinaryUtils::ReadUInt16Array(InteropInputStream* stream, uint16_t* res, const int32_t len) + { + stream->ReadUInt16Array(res, len); + } + + void BinaryUtils::WriteUInt16Array(InteropOutputStream* stream, const uint16_t* val, const int32_t len) + { + stream->WriteUInt16Array(val, len); + } + + int32_t BinaryUtils::ReadInt32(InteropInputStream* stream) + { + return stream->ReadInt32(); + } + + void BinaryUtils::WriteInt32(InteropOutputStream* stream, int32_t val) + { + stream->WriteInt32(val); + } + + void BinaryUtils::ReadInt32Array(InteropInputStream* stream, int32_t* res, const int32_t len) + { + stream->ReadInt32Array(res, len); + } + + void BinaryUtils::WriteInt32Array(InteropOutputStream* stream, const int32_t* val, const int32_t len) + { + stream->WriteInt32Array(val, len); + } + + int64_t BinaryUtils::ReadInt64(InteropInputStream* stream) + { + return stream->ReadInt64(); + } + + void BinaryUtils::WriteInt64(InteropOutputStream* stream, int64_t val) + { + stream->WriteInt64(val); + } + + void BinaryUtils::ReadInt64Array(InteropInputStream* stream, int64_t* res, const int32_t len) + { + stream->ReadInt64Array(res, len); + } + + void BinaryUtils::WriteInt64Array(InteropOutputStream* stream, const int64_t* val, const int32_t len) + { + stream->WriteInt64Array(val, len); + } + + float BinaryUtils::ReadFloat(InteropInputStream* stream) + { + return stream->ReadFloat(); + } + + void BinaryUtils::WriteFloat(InteropOutputStream* stream, float val) + { + stream->WriteFloat(val); + } + + void BinaryUtils::ReadFloatArray(InteropInputStream* stream, float* res, const int32_t len) + { + stream->ReadFloatArray(res, len); + } + + void BinaryUtils::WriteFloatArray(InteropOutputStream* stream, const float* val, const int32_t len) + { + stream->WriteFloatArray(val, len); + } + + double BinaryUtils::ReadDouble(InteropInputStream* stream) + { + return stream->ReadDouble(); + } + + void BinaryUtils::WriteDouble(InteropOutputStream* stream, double val) + { + stream->WriteDouble(val); + } + + void BinaryUtils::ReadDoubleArray(InteropInputStream* stream, double* res, const int32_t len) + { + stream->ReadDoubleArray(res, len); + } + + void BinaryUtils::WriteDoubleArray(InteropOutputStream* stream, const double* val, const int32_t len) + { + stream->WriteDoubleArray(val, len); + } + + Guid BinaryUtils::ReadGuid(interop::InteropInputStream* stream) + { + int64_t most = stream->ReadInt64(); + int64_t least = stream->ReadInt64(); + + return Guid(most, least); + } + + void BinaryUtils::WriteGuid(interop::InteropOutputStream* stream, const Guid val) + { + stream->WriteInt64(val.GetMostSignificantBits()); + stream->WriteInt64(val.GetLeastSignificantBits()); + } + + Date BinaryUtils::ReadDate(interop::InteropInputStream * stream) + { + int64_t milliseconds = stream->ReadInt64(); + + return Date(milliseconds); + } + + void BinaryUtils::WriteDate(interop::InteropOutputStream* stream, const Date val) + { + stream->WriteInt64(val.GetMilliseconds()); + } + + Timestamp BinaryUtils::ReadTimestamp(interop::InteropInputStream* stream) + { + int64_t milliseconds = stream->ReadInt64(); + int32_t nanoseconds = stream->ReadInt32(); + + return Timestamp(milliseconds / 1000, nanoseconds); + } + + void BinaryUtils::WriteTimestamp(interop::InteropOutputStream* stream, const Timestamp val) + { + stream->WriteInt64(val.GetSeconds() * 1000); + stream->WriteInt32(val.GetSecondFraction()); + } + + void BinaryUtils::WriteString(interop::InteropOutputStream* stream, const char* val, const int32_t len) + { + stream->WriteInt32(len); + stream->WriteInt8Array(reinterpret_cast<const int8_t*>(val), len); + } + + Date BinaryUtils::MakeDateGmt(int year, int month, int day, int hour, + int min, int sec) + { + tm date = { 0 }; + + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = hour; + date.tm_min = min; + date.tm_sec = sec; + + return CTmToDate(date); + } + + Date BinaryUtils::MakeDateLocal(int year, int month, int day, int hour, + int min, int sec) + { + tm date = { 0 }; + + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = hour; + date.tm_min = min; + date.tm_sec = sec; + + time_t localTime = common::IgniteTimeLocal(date); + + return CTimeToDate(localTime); + } + + Timestamp BinaryUtils::MakeTimestampGmt(int year, int month, int day, + int hour, int min, int sec, long ns) + { + tm date = { 0 }; + + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = hour; + date.tm_min = min; + date.tm_sec = sec; + + return CTmToTimestamp(date, ns); + } + + Timestamp BinaryUtils::MakeTimestampLocal(int year, int month, int day, + int hour, int min, int sec, long ns) + { + tm date = { 0 }; + + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = hour; + date.tm_min = min; + date.tm_sec = sec; + + time_t localTime = common::IgniteTimeLocal(date); + + return CTimeToTimestamp(localTime, ns); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/binary/binary_writer_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/binary/binary_writer_impl.cpp b/modules/platforms/cpp/binary/src/impl/binary/binary_writer_impl.cpp new file mode 100644 index 0000000..5df0c2a --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/binary/binary_writer_impl.cpp @@ -0,0 +1,770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/ignite_error.h> + +#include "ignite/impl/binary/binary_writer_impl.h" +#include "ignite/impl/interop/interop_stream_position_guard.h" + +using namespace ignite::impl::interop; +using namespace ignite::impl::binary; +using namespace ignite::binary; + +namespace ignite +{ + namespace impl + { + namespace binary + { + BinaryWriterImpl::BinaryWriterImpl(InteropOutputStream* stream, BinaryIdResolver* idRslvr, + BinaryTypeManager* metaMgr, BinaryTypeHandler* metaHnd, int32_t start) : + stream(stream), idRslvr(idRslvr), metaMgr(metaMgr), metaHnd(metaHnd), typeId(idRslvr->GetTypeId()), + elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(-1), start(start) + { + // No-op. + } + + BinaryWriterImpl::BinaryWriterImpl(InteropOutputStream* stream, BinaryTypeManager* metaMgr) : + stream(stream), idRslvr(NULL), metaMgr(metaMgr), metaHnd(NULL), typeId(0), + elemIdGen(0), elemId(0), elemCnt(0), elemPos(-1), rawPos(0), start(stream->Position()) + { + // No-op. + } + + void BinaryWriterImpl::WriteInt8(const int8_t val) + { + WritePrimitiveRaw<int8_t>(val, BinaryUtils::WriteInt8); + } + + void BinaryWriterImpl::WriteInt8Array(const int8_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int8_t>(val, len, BinaryUtils::WriteInt8Array, IGNITE_TYPE_ARRAY_BYTE); + } + + void BinaryWriterImpl::WriteInt8(const char* fieldName, const int8_t val) + { + WritePrimitive<int8_t>(fieldName, val, BinaryUtils::WriteInt8, IGNITE_TYPE_BYTE, 1); + } + + void BinaryWriterImpl::WriteInt8Array(const char* fieldName, const int8_t* val, const int32_t len) + { + WritePrimitiveArray<int8_t>(fieldName, val, len, BinaryUtils::WriteInt8Array, IGNITE_TYPE_ARRAY_BYTE, 0); + } + + void BinaryWriterImpl::WriteBool(const bool val) + { + WritePrimitiveRaw<bool>(val, BinaryUtils::WriteBool); + } + + void BinaryWriterImpl::WriteBoolArray(const bool* val, const int32_t len) + { + WritePrimitiveArrayRaw<bool>(val, len, BinaryUtils::WriteBoolArray, IGNITE_TYPE_ARRAY_BOOL); + } + + void BinaryWriterImpl::WriteBool(const char* fieldName, const bool val) + { + WritePrimitive<bool>(fieldName, val, BinaryUtils::WriteBool, IGNITE_TYPE_BOOL, 1); + } + + void BinaryWriterImpl::WriteBoolArray(const char* fieldName, const bool* val, const int32_t len) + { + WritePrimitiveArray<bool>(fieldName, val, len, BinaryUtils::WriteBoolArray, IGNITE_TYPE_ARRAY_BOOL, 0); + } + + void BinaryWriterImpl::WriteInt16(const int16_t val) + { + WritePrimitiveRaw<int16_t>(val, BinaryUtils::WriteInt16); + } + + void BinaryWriterImpl::WriteInt16Array(const int16_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int16_t>(val, len, BinaryUtils::WriteInt16Array, IGNITE_TYPE_ARRAY_SHORT); + } + + void BinaryWriterImpl::WriteInt16(const char* fieldName, const int16_t val) + { + WritePrimitive<int16_t>(fieldName, val, BinaryUtils::WriteInt16, IGNITE_TYPE_SHORT, 2); + } + + void BinaryWriterImpl::WriteInt16Array(const char* fieldName, const int16_t* val, const int32_t len) + { + WritePrimitiveArray<int16_t>(fieldName, val, len, BinaryUtils::WriteInt16Array, IGNITE_TYPE_ARRAY_SHORT, 1); + } + + void BinaryWriterImpl::WriteUInt16(const uint16_t val) + { + WritePrimitiveRaw<uint16_t>(val, BinaryUtils::WriteUInt16); + } + + void BinaryWriterImpl::WriteUInt16Array(const uint16_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<uint16_t>(val, len, BinaryUtils::WriteUInt16Array, IGNITE_TYPE_ARRAY_CHAR); + } + + void BinaryWriterImpl::WriteUInt16(const char* fieldName, const uint16_t val) + { + WritePrimitive<uint16_t>(fieldName, val, BinaryUtils::WriteUInt16, IGNITE_TYPE_CHAR, 2); + } + + void BinaryWriterImpl::WriteUInt16Array(const char* fieldName, const uint16_t* val, const int32_t len) + { + WritePrimitiveArray<uint16_t>(fieldName, val, len, BinaryUtils::WriteUInt16Array, IGNITE_TYPE_ARRAY_CHAR, 1); + } + + void BinaryWriterImpl::WriteInt32(const int32_t val) + { + WritePrimitiveRaw<int32_t>(val, BinaryUtils::WriteInt32); + } + + void BinaryWriterImpl::WriteInt32Array(const int32_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int32_t>(val, len, BinaryUtils::WriteInt32Array, IGNITE_TYPE_ARRAY_INT); + } + + void BinaryWriterImpl::WriteInt32(const char* fieldName, const int32_t val) + { + WritePrimitive<int32_t>(fieldName, val, BinaryUtils::WriteInt32, IGNITE_TYPE_INT, 4); + } + + void BinaryWriterImpl::WriteInt32Array(const char* fieldName, const int32_t* val, const int32_t len) + { + WritePrimitiveArray<int32_t>(fieldName, val, len, BinaryUtils::WriteInt32Array, IGNITE_TYPE_ARRAY_INT, 2); + } + + void BinaryWriterImpl::WriteInt64(const int64_t val) + { + WritePrimitiveRaw<int64_t>(val, BinaryUtils::WriteInt64); + } + + void BinaryWriterImpl::WriteInt64Array(const int64_t* val, const int32_t len) + { + WritePrimitiveArrayRaw<int64_t>(val, len, BinaryUtils::WriteInt64Array, IGNITE_TYPE_ARRAY_LONG); + } + + void BinaryWriterImpl::WriteInt64(const char* fieldName, const int64_t val) + { + WritePrimitive<int64_t>(fieldName, val, BinaryUtils::WriteInt64, IGNITE_TYPE_LONG, 8); + } + + void BinaryWriterImpl::WriteInt64Array(const char* fieldName, const int64_t* val, const int32_t len) + { + WritePrimitiveArray<int64_t>(fieldName, val, len, BinaryUtils::WriteInt64Array, IGNITE_TYPE_ARRAY_LONG, 3); + } + + void BinaryWriterImpl::WriteFloat(const float val) + { + WritePrimitiveRaw<float>(val, BinaryUtils::WriteFloat); + } + + void BinaryWriterImpl::WriteFloatArray(const float* val, const int32_t len) + { + WritePrimitiveArrayRaw<float>(val, len, BinaryUtils::WriteFloatArray, IGNITE_TYPE_ARRAY_FLOAT); + } + + void BinaryWriterImpl::WriteFloat(const char* fieldName, const float val) + { + WritePrimitive<float>(fieldName, val, BinaryUtils::WriteFloat, IGNITE_TYPE_FLOAT, 4); + } + + void BinaryWriterImpl::WriteFloatArray(const char* fieldName, const float* val, const int32_t len) + { + WritePrimitiveArray<float>(fieldName, val, len, BinaryUtils::WriteFloatArray, IGNITE_TYPE_ARRAY_FLOAT, 2); + } + + void BinaryWriterImpl::WriteDouble(const double val) + { + WritePrimitiveRaw<double>(val, BinaryUtils::WriteDouble); + } + + void BinaryWriterImpl::WriteDoubleArray(const double* val, const int32_t len) + { + WritePrimitiveArrayRaw<double>(val, len, BinaryUtils::WriteDoubleArray, IGNITE_TYPE_ARRAY_DOUBLE); + } + + void BinaryWriterImpl::WriteDouble(const char* fieldName, const double val) + { + WritePrimitive<double>(fieldName, val, BinaryUtils::WriteDouble, IGNITE_TYPE_DOUBLE, 8); + } + + void BinaryWriterImpl::WriteDoubleArray(const char* fieldName, const double* val, const int32_t len) + { + WritePrimitiveArray<double>(fieldName, val, len, BinaryUtils::WriteDoubleArray, IGNITE_TYPE_ARRAY_DOUBLE, 3); + } + + void BinaryWriterImpl::WriteGuid(const Guid val) + { + CheckRawMode(true); + CheckSingleMode(true); + + stream->WriteInt8(IGNITE_TYPE_UUID); + + BinaryUtils::WriteGuid(stream, val); + } + + void BinaryWriterImpl::WriteGuidArray(const Guid* val, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_UUID); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + Guid elem = *(val + i); + + stream->WriteInt8(IGNITE_TYPE_UUID); + BinaryUtils::WriteGuid(stream, elem); + } + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteGuid(const char* fieldName, const Guid val) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_UUID); + + stream->WriteInt8(IGNITE_TYPE_UUID); + + BinaryUtils::WriteGuid(stream, val); + } + + void BinaryWriterImpl::WriteGuidArray(const char* fieldName, const Guid* val, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_UUID); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_UUID); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + Guid elem = *(val + i); + + WriteTopObject(elem); + } + } + else + { + stream->WriteInt8(IGNITE_HDR_NULL); + } + } + + void BinaryWriterImpl::WriteDate(const Date& val) + { + CheckRawMode(true); + CheckSingleMode(true); + + stream->WriteInt8(IGNITE_TYPE_DATE); + + BinaryUtils::WriteDate(stream, val); + } + + void BinaryWriterImpl::WriteDateArray(const Date* val, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_DATE); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + const Date& elem = *(val + i); + + stream->WriteInt8(IGNITE_TYPE_DATE); + BinaryUtils::WriteDate(stream, elem); + } + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteDate(const char* fieldName, const Date& val) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_DATE); + + stream->WriteInt8(IGNITE_TYPE_DATE); + + BinaryUtils::WriteDate(stream, val); + } + + void BinaryWriterImpl::WriteDateArray(const char* fieldName, const Date* val, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_DATE); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_DATE); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + const Date& elem = *(val + i); + + WriteTopObject(elem); + } + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteTimestamp(const Timestamp& val) + { + CheckRawMode(true); + CheckSingleMode(true); + + stream->WriteInt8(IGNITE_TYPE_TIMESTAMP); + + BinaryUtils::WriteTimestamp(stream, val); + } + + void BinaryWriterImpl::WriteTimestampArray(const Timestamp* val, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_TIMESTAMP); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + const Timestamp& elem = *(val + i); + + stream->WriteInt8(IGNITE_TYPE_TIMESTAMP); + BinaryUtils::WriteTimestamp(stream, elem); + } + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteTimestamp(const char* fieldName, const Timestamp& val) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_TIMESTAMP); + + stream->WriteInt8(IGNITE_TYPE_TIMESTAMP); + + BinaryUtils::WriteTimestamp(stream, val); + } + + void BinaryWriterImpl::WriteTimestampArray(const char* fieldName, const Timestamp* val, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_TIMESTAMP); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_ARRAY_TIMESTAMP); + stream->WriteInt32(len); + + for (int i = 0; i < len; i++) + { + const Timestamp& elem = *(val + i); + + WriteTopObject(elem); + } + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteString(const char* val, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_STRING); + + BinaryUtils::WriteString(stream, val, len); + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteString(const char* fieldName, const char* val, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_STRING); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_STRING); + + BinaryUtils::WriteString(stream, val, len); + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + } + + int32_t BinaryWriterImpl::WriteStringArray() + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_ARRAY_STRING); + stream->Position(stream->Position() + 4); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteStringArray(const char* fieldName) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY_STRING); + + stream->WriteInt8(IGNITE_TYPE_ARRAY_STRING); + stream->Position(stream->Position() + 4); + + return elemId; + } + + void BinaryWriterImpl::WriteStringElement(int32_t id, const char* val, int32_t len) + { + CheckSession(id); + + if (val) + { + stream->WriteInt8(IGNITE_TYPE_STRING); + + BinaryUtils::WriteString(stream, val, len); + } + else + stream->WriteInt8(IGNITE_HDR_NULL); + + elemCnt++; + } + + void BinaryWriterImpl::WriteNull() + { + CheckRawMode(true); + CheckSingleMode(true); + + stream->WriteInt8(IGNITE_HDR_NULL); + } + + void BinaryWriterImpl::WriteNull(const char* fieldName) + { + CheckRawMode(false); + CheckSingleMode(true); + + WriteFieldId(fieldName, IGNITE_TYPE_OBJECT); + stream->WriteInt8(IGNITE_HDR_NULL); + } + + int32_t BinaryWriterImpl::WriteArray() + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_ARRAY); + stream->Position(stream->Position() + 4); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteArray(const char* fieldName) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_ARRAY); + + stream->WriteInt8(IGNITE_TYPE_ARRAY); + stream->Position(stream->Position() + 4); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteCollection(CollectionType typ) + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_COLLECTION); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteCollection(const char* fieldName, CollectionType typ) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_COLLECTION); + + stream->WriteInt8(IGNITE_TYPE_COLLECTION); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteMap(ignite::binary::MapType typ) + { + StartContainerSession(true); + + stream->WriteInt8(IGNITE_TYPE_MAP); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + int32_t BinaryWriterImpl::WriteMap(const char* fieldName, ignite::binary::MapType typ) + { + StartContainerSession(false); + + WriteFieldId(fieldName, IGNITE_TYPE_MAP); + + stream->WriteInt8(IGNITE_TYPE_MAP); + stream->Position(stream->Position() + 4); + stream->WriteInt8(typ); + + return elemId; + } + + void BinaryWriterImpl::CommitContainer(int32_t id) + { + CheckSession(id); + + stream->WriteInt32(elemPos + 1, elemCnt); + + elemId = 0; + elemCnt = 0; + elemPos = -1; + } + + void BinaryWriterImpl::SetRawMode() + { + CheckRawMode(false); + CheckSingleMode(true); + + rawPos = stream->Position(); + } + + int32_t BinaryWriterImpl::GetRawPosition() const + { + return rawPos == -1 ? stream->Position() : rawPos; + } + + void BinaryWriterImpl::CheckRawMode(bool expected) const + { + bool rawMode = rawPos != -1; + + if (expected && !rawMode) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation can be performed only in raw mode."); + } + else if (!expected && rawMode) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation cannot be performed in raw mode."); + } + } + + void BinaryWriterImpl::CheckSingleMode(bool expected) const + { + if (expected && elemId != 0) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation cannot be performed when container is being written."); + } + else if (!expected && elemId == 0) { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Operation can be performed only when container is being written."); + } + } + + void BinaryWriterImpl::StartContainerSession(bool expRawMode) + { + CheckRawMode(expRawMode); + CheckSingleMode(true); + + elemId = ++elemIdGen; + elemPos = stream->Position(); + } + + void BinaryWriterImpl::CheckSession(int32_t expSes) const + { + if (elemId != expSes) + { + IGNITE_ERROR_1(IgniteError::IGNITE_ERR_BINARY, "Containter write session has been finished or is not started yet."); + } + } + + void BinaryWriterImpl::WriteFieldId(const char* fieldName, int32_t fieldTypeId) + { + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldOff = stream->Position() - start; + + schema.AddField(fieldId, fieldOff); + + if (metaHnd) + metaHnd->OnFieldWritten(fieldId, fieldName, fieldTypeId); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int8_t>(const int8_t& obj) + { + WriteTopObject0<int8_t>(obj, BinaryUtils::WriteInt8, IGNITE_TYPE_BYTE); + } + + template <> + void BinaryWriterImpl::WriteTopObject<bool>(const bool& obj) + { + WriteTopObject0<bool>(obj, BinaryUtils::WriteBool, IGNITE_TYPE_BOOL); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int16_t>(const int16_t& obj) + { + WriteTopObject0<int16_t>(obj, BinaryUtils::WriteInt16, IGNITE_TYPE_SHORT); + } + + template <> + void BinaryWriterImpl::WriteTopObject<uint16_t>(const uint16_t& obj) + { + WriteTopObject0<uint16_t>(obj, BinaryUtils::WriteUInt16, IGNITE_TYPE_CHAR); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int32_t>(const int32_t& obj) + { + WriteTopObject0<int32_t>(obj, BinaryUtils::WriteInt32, IGNITE_TYPE_INT); + } + + template <> + void BinaryWriterImpl::WriteTopObject<int64_t>(const int64_t& obj) + { + WriteTopObject0<int64_t>(obj, BinaryUtils::WriteInt64, IGNITE_TYPE_LONG); + } + + template <> + void BinaryWriterImpl::WriteTopObject<float>(const float& obj) + { + WriteTopObject0<float>(obj, BinaryUtils::WriteFloat, IGNITE_TYPE_FLOAT); + } + + template <> + void BinaryWriterImpl::WriteTopObject<double>(const double& obj) + { + WriteTopObject0<double>(obj, BinaryUtils::WriteDouble, IGNITE_TYPE_DOUBLE); + } + + template <> + void BinaryWriterImpl::WriteTopObject<Guid>(const Guid& obj) + { + WriteTopObject0<Guid>(obj, BinaryUtils::WriteGuid, IGNITE_TYPE_UUID); + } + + template <> + void BinaryWriterImpl::WriteTopObject<Date>(const Date& obj) + { + WriteTopObject0<Date>(obj, BinaryUtils::WriteDate, IGNITE_TYPE_DATE); + } + + template <> + void BinaryWriterImpl::WriteTopObject<Timestamp>(const Timestamp& obj) + { + WriteTopObject0<Timestamp>(obj, BinaryUtils::WriteTimestamp, IGNITE_TYPE_TIMESTAMP); + } + + void BinaryWriterImpl::PostWrite() + { + int32_t lenWithoutSchema = stream->Position() - start; + + int32_t nonRawLen = rawPos == -1 ? lenWithoutSchema : rawPos - start; + + uint16_t flags = IGNITE_BINARY_FLAG_USER_TYPE; + + if (rawPos > 0) + flags |= IGNITE_BINARY_FLAG_HAS_RAW; + + if (!HasSchema()) + { + stream->WriteInt16(start + IGNITE_OFFSET_FLAGS, flags); + stream->WriteInt32(start + IGNITE_OFFSET_LEN, lenWithoutSchema); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, 0); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, GetRawPosition() - start); + } + else + { + int32_t schemaId = schema.GetId(); + BinaryOffsetType schemaType = schema.GetType(); + + WriteAndClearSchema(); + + if (rawPos > 0) + stream->WriteInt32(rawPos - start); + + int32_t length = stream->Position() - start; + + flags |= IGNITE_BINARY_FLAG_HAS_SCHEMA; + + if (schemaType == OFFSET_TYPE_ONE_BYTE) + flags |= IGNITE_BINARY_FLAG_OFFSET_ONE_BYTE; + else if (schemaType == OFFSET_TYPE_TWO_BYTES) + flags |= IGNITE_BINARY_FLAG_OFFSET_TWO_BYTES; + + stream->WriteInt16(start + IGNITE_OFFSET_FLAGS, flags); + stream->WriteInt32(start + IGNITE_OFFSET_LEN, length); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_ID, schemaId); + stream->WriteInt32(start + IGNITE_OFFSET_SCHEMA_OR_RAW_OFF, lenWithoutSchema); + } + } + + bool BinaryWriterImpl::HasSchema() const + { + return !schema.Empty(); + } + + void BinaryWriterImpl::WriteAndClearSchema() + { + schema.Write(*stream); + + schema.Clear(); + } + + InteropOutputStream* BinaryWriterImpl::GetStream() + { + return stream; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp b/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp new file mode 100644 index 0000000..ce4f0ae --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/interop/interop_input_stream.cpp @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cstring> + +#include <ignite/ignite_error.h> + +#include "ignite/impl/interop//interop_input_stream.h" + +/** + * Common macro to read a single value. + */ +#define IGNITE_INTEROP_IN_READ(type, len) { \ + EnsureEnoughData(len); \ + type res = *reinterpret_cast<type*>(data + pos); \ + Shift(len); \ + return res; \ +} + +/** + * Common macro to read an array. + */ +#define IGNITE_INTEROP_IN_READ_ARRAY(len, shift) { \ + CopyAndShift(reinterpret_cast<int8_t*>(res), 0, len << shift); \ +} + +namespace ignite +{ + namespace impl + { + namespace interop + { + union BinaryInt32Float + { + int32_t i; + float f; + }; + + union BinaryInt64Double + { + int64_t i; + double d; + }; + + InteropInputStream::InteropInputStream(InteropMemory* mem) + { + this->mem = mem; + + data = mem->Data(); + len = mem->Length(); + pos = 0; + } + + int8_t InteropInputStream::ReadInt8() + { + IGNITE_INTEROP_IN_READ(int8_t, 1); + } + + int32_t InteropInputStream::ReadInt8(int32_t pos) + { + int delta = pos + 1 - this->pos; + + if (delta > 0) + EnsureEnoughData(delta); + + return *reinterpret_cast<int8_t*>(data + pos); + } + + void InteropInputStream::ReadInt8Array(int8_t* const res, const int32_t len) + { + IGNITE_INTEROP_IN_READ_ARRAY(len, 0); + } + + bool InteropInputStream::ReadBool() + { + return ReadInt8() == 1; + } + + void InteropInputStream::ReadBoolArray(bool* const res, const int32_t len) + { + for (int i = 0; i < len; i++) + *(res + i) = ReadBool(); + } + + int16_t InteropInputStream::ReadInt16() + { + IGNITE_INTEROP_IN_READ(int16_t, 2); + } + + int32_t InteropInputStream::ReadInt16(int32_t pos) + { + int delta = pos + 2 - this->pos; + + if (delta > 0) + EnsureEnoughData(delta); + + return *reinterpret_cast<int16_t*>(data + pos); + } + + void InteropInputStream::ReadInt16Array(int16_t* const res, const int32_t len) + { + IGNITE_INTEROP_IN_READ_ARRAY(len, 1); + } + + uint16_t InteropInputStream::ReadUInt16() + { + IGNITE_INTEROP_IN_READ(uint16_t, 2); + } + + void InteropInputStream::ReadUInt16Array(uint16_t* const res, const int32_t len) + { + IGNITE_INTEROP_IN_READ_ARRAY(len, 1); + } + + int32_t InteropInputStream::ReadInt32() + { + IGNITE_INTEROP_IN_READ(int32_t, 4); + } + + int32_t InteropInputStream::ReadInt32(int32_t pos) + { + int delta = pos + 4 - this->pos; + + if (delta > 0) + EnsureEnoughData(delta); + + return *reinterpret_cast<int32_t*>(data + pos); + } + + void InteropInputStream::ReadInt32Array(int32_t* const res, const int32_t len) + { + IGNITE_INTEROP_IN_READ_ARRAY(len, 2); + } + + int64_t InteropInputStream::ReadInt64() + { + IGNITE_INTEROP_IN_READ(int64_t, 8); + } + + void InteropInputStream::ReadInt64Array(int64_t* const res, const int32_t len) + { + IGNITE_INTEROP_IN_READ_ARRAY(len, 3); + } + + float InteropInputStream::ReadFloat() + { + BinaryInt32Float u; + + u.i = ReadInt32(); + + return u.f; + } + + void InteropInputStream::ReadFloatArray(float* const res, const int32_t len) + { + IGNITE_INTEROP_IN_READ_ARRAY(len, 2); + } + + double InteropInputStream::ReadDouble() + { + BinaryInt64Double u; + + u.i = ReadInt64(); + + return u.d; + } + + void InteropInputStream::ReadDoubleArray(double* const res, const int32_t len) + { + IGNITE_INTEROP_IN_READ_ARRAY(len, 3); + } + + int32_t InteropInputStream::Remaining() const + { + return len - pos; + } + + int32_t InteropInputStream::Position() const + { + return pos; + } + + void InteropInputStream::Position(int32_t pos) + { + if (pos > len) { + IGNITE_ERROR_FORMATTED_3(IgniteError::IGNITE_ERR_MEMORY, "Requested input stream position is out of bounds", + "memPtr", mem->PointerLong(), "len", len, "pos", pos); + } + + this->pos = pos; + } + + void InteropInputStream::Synchronize() + { + data = mem->Data(); + len = mem->Length(); + } + + void InteropInputStream::EnsureEnoughData(int32_t cnt) const + { + if (len - pos < cnt) { + IGNITE_ERROR_FORMATTED_4(IgniteError::IGNITE_ERR_MEMORY, "Not enough data in the stream", + "memPtr", mem->PointerLong(), "len", len, "pos", pos, "requested", cnt); + } + } + + void InteropInputStream::CopyAndShift(int8_t* dest, int32_t off, int32_t cnt) + { + EnsureEnoughData(cnt); + + memcpy(dest + off, data + pos, cnt); + + Shift(cnt); + } + + void InteropInputStream::Shift(int32_t cnt) + { + pos += cnt; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp b/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp new file mode 100644 index 0000000..926b7fb --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/interop/interop_memory.cpp @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/ignite_error.h> + +#include "ignite/impl/interop/interop_memory.h" + +namespace ignite +{ + namespace impl + { + namespace interop + { + int8_t* InteropMemory::Data(const int8_t* memPtr) + { + return reinterpret_cast<int8_t*>(*reinterpret_cast<const int64_t*>(memPtr)); + } + + void InteropMemory::Data(int8_t* memPtr, void* ptr) + { + *reinterpret_cast<int64_t*>(memPtr) = reinterpret_cast<int64_t>(ptr); + } + + int32_t InteropMemory::Capacity(const int8_t* memPtr) + { + return *reinterpret_cast<const int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_CAP); + } + + void InteropMemory::Capacity(int8_t* memPtr, int32_t val) + { + *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_CAP) = val; + } + + int32_t InteropMemory::Length(const int8_t* memPtr) + { + return *reinterpret_cast<const int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_LEN); + } + + void InteropMemory::Length(int8_t* memPtr, int32_t val) + { + *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_LEN) = val; + } + + int32_t InteropMemory::Flags(const int8_t* memPtr) + { + return *reinterpret_cast<const int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_FLAGS); + } + + void InteropMemory::Flags(int8_t* memPtr, int32_t val) + { + *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_FLAGS) = val; + } + + bool InteropMemory::IsExternal(const int8_t* memPtr) + { + return IsExternal(Flags(memPtr)); + } + + bool InteropMemory::IsExternal(int32_t flags) + { + return (flags & IGNITE_MEM_FLAG_EXT) != IGNITE_MEM_FLAG_EXT; + } + + bool InteropMemory::IsPooled(const int8_t* memPtr) + { + return IsPooled(Flags(memPtr)); + } + + bool InteropMemory::IsPooled(int32_t flags) + { + return (flags & IGNITE_MEM_FLAG_POOLED) != 0; + } + + bool InteropMemory::IsAcquired(const int8_t* memPtr) + { + return IsAcquired(Flags(memPtr)); + } + + bool InteropMemory::IsAcquired(int32_t flags) + { + return (flags & IGNITE_MEM_FLAG_ACQUIRED) != 0; + } + + int8_t* InteropMemory::Pointer() + { + return memPtr; + } + + int64_t InteropMemory::PointerLong() + { + return reinterpret_cast<int64_t>(memPtr); + } + + int8_t* InteropMemory::Data() + { + return Data(memPtr); + } + + int32_t InteropMemory::Capacity() const + { + return Capacity(memPtr); + } + + void InteropMemory::Capacity(int32_t val) + { + Capacity(memPtr, val); + } + + int32_t InteropMemory::Length() const + { + return Length(memPtr); + } + + void InteropMemory::Length(int32_t val) + { + Length(memPtr, val); + } + + InteropUnpooledMemory::InteropUnpooledMemory(int32_t cap) + { + memPtr = static_cast<int8_t*>(malloc(IGNITE_MEM_HDR_LEN)); + + Data(memPtr, malloc(cap)); + Capacity(memPtr, cap); + Length(memPtr, 0); + Flags(memPtr, IGNITE_MEM_FLAG_EXT); + + owning = true; + } + + InteropUnpooledMemory::InteropUnpooledMemory(int8_t* memPtr) + { + this->memPtr = memPtr; + this->owning = false; + } + + InteropUnpooledMemory::~InteropUnpooledMemory() + { + if (owning) { + free(Data()); + free(memPtr); + } + } + + void InteropUnpooledMemory::Reallocate(int32_t cap) + { + int doubledCap = Capacity() << 1; + + if (doubledCap > cap) + cap = doubledCap; + + Data(memPtr, realloc(Data(memPtr), cap)); + Capacity(memPtr, cap); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp b/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp new file mode 100644 index 0000000..374f634 --- /dev/null +++ b/modules/platforms/cpp/binary/src/impl/interop/interop_output_stream.cpp @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cstring> + +#include <ignite/ignite_error.h> + +#include "ignite/impl/interop//interop_output_stream.h" + +/** + * Common macro to write a single value. + */ +#define IGNITE_INTEROP_OUT_WRITE(val, type, len) { \ + EnsureCapacity(pos + len); \ + *reinterpret_cast<type*>(data + pos) = val; \ + Shift(len); \ +} + +/** + * Common macro to write an array. + */ +#define IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len) { \ + CopyAndShift(reinterpret_cast<const int8_t*>(val), 0, len); \ +} + +namespace ignite +{ + namespace impl + { + namespace interop + { + union BinaryFloatInt32 + { + float f; + int32_t i; + }; + + union BinaryDoubleInt64 + { + double d; + int64_t i; + }; + + InteropOutputStream::InteropOutputStream(InteropMemory* mem) + { + this->mem = mem; + + data = mem->Data(); + cap = mem->Capacity(); + pos = 0; + } + + void InteropOutputStream::WriteInt8(const int8_t val) + { + IGNITE_INTEROP_OUT_WRITE(val, int8_t, 1); + } + + void InteropOutputStream::WriteInt8(const int8_t val, const int32_t pos) + { + EnsureCapacity(pos + 1); + + *(data + pos) = val; + } + + void InteropOutputStream::WriteInt8Array(const int8_t* val, const int32_t len) + { + IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len); + } + + void InteropOutputStream::WriteBool(const bool val) + { + WriteInt8(val ? 1 : 0); + } + + void InteropOutputStream::WriteBoolArray(const bool* val, const int32_t len) + { + for (int i = 0; i < len; i++) + WriteBool(*(val + i)); + } + + void InteropOutputStream::WriteInt16(const int16_t val) + { + IGNITE_INTEROP_OUT_WRITE(val, int16_t, 2); + } + + void InteropOutputStream::WriteInt16(const int32_t pos, const int16_t val) + { + EnsureCapacity(pos + 2); + + *reinterpret_cast<int16_t*>(data + pos) = val; + } + + void InteropOutputStream::WriteInt16Array(const int16_t* val, const int32_t len) + { + IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 1); + } + + void InteropOutputStream::WriteUInt16(const uint16_t val) + { + IGNITE_INTEROP_OUT_WRITE(val, uint16_t, 2); + } + + void InteropOutputStream::WriteUInt16Array(const uint16_t* val, const int32_t len) + { + IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 1); + } + + void InteropOutputStream::WriteInt32(const int32_t val) + { + IGNITE_INTEROP_OUT_WRITE(val, int32_t, 4); + } + + void InteropOutputStream::WriteInt32(const int32_t pos, const int32_t val) + { + EnsureCapacity(pos + 4); + + *reinterpret_cast<int32_t*>(data + pos) = val; + } + + void InteropOutputStream::WriteInt32Array(const int32_t* val, const int32_t len) + { + IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 2); + } + + void InteropOutputStream::WriteInt64(const int64_t val) + { + IGNITE_INTEROP_OUT_WRITE(val, int64_t, 8); + } + + void InteropOutputStream::WriteInt64Array(const int64_t* val, const int32_t len) + { + IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 3); + } + + void InteropOutputStream::WriteFloat(const float val) + { + BinaryFloatInt32 u; + + u.f = val; + + WriteInt32(u.i); + } + + void InteropOutputStream::WriteFloatArray(const float* val, const int32_t len) + { + for (int i = 0; i < len; i++) + WriteFloat(*(val + i)); + } + + void InteropOutputStream::WriteDouble(const double val) + { + BinaryDoubleInt64 u; + + u.d = val; + + WriteInt64(u.i); + } + + void InteropOutputStream::WriteDoubleArray(const double* val, const int32_t len) + { + for (int i = 0; i < len; i++) + WriteDouble(*(val + i)); + } + + int32_t InteropOutputStream::Position() const + { + return pos; + } + + void InteropOutputStream::Position(const int32_t val) + { + EnsureCapacity(val); + + pos = val; + } + + int32_t InteropOutputStream::Reserve(int32_t num) + { + EnsureCapacity(pos + num); + + int32_t res = pos; + + Shift(num); + + return res; + } + + void InteropOutputStream::Synchronize() + { + mem->Length(pos); + } + + void InteropOutputStream::EnsureCapacity(int32_t reqCap) { + if (reqCap > cap) { + int newCap = cap << 1; + + if (newCap < reqCap) + newCap = reqCap; + + mem->Reallocate(newCap); + data = mem->Data(); + cap = newCap; + } + } + + void InteropOutputStream::Shift(int32_t cnt) { + pos += cnt; + } + + void InteropOutputStream::CopyAndShift(const int8_t* src, int32_t off, int32_t len) { + EnsureCapacity(pos + len); + + memcpy(data + pos, src + off, len); + + Shift(len); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/common/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/Makefile.am b/modules/platforms/cpp/common/Makefile.am index f5ca5dd..6956627 100644 --- a/modules/platforms/cpp/common/Makefile.am +++ b/modules/platforms/cpp/common/Makefile.am @@ -15,29 +15,39 @@ ## limitations under the License. ## -ACLOCAL_AMFLAGS = "-Im4" - -SUBDIRS = . include os/linux/include -DIST_SUBDIRS = . include os/linux/include - -AM_CPPFLAGS = -I$(srcdir)/include -I$(srcdir)/os/linux/include -I$(JAVA_HOME)/include -I$(JAVA_HOME)/include/linux -DIGNITE_IMPL -AM_CXXFLAGS = -Wall -std=c++0x -LIB_LDFLAGS = -no-undefined -version-info 1 - -COMMON_SRC = os/linux/src/concurrent_os.cpp \ - src/concurrent.cpp \ - src/java.cpp \ - src/exports.cpp \ - os/linux/src/common.cpp +ACLOCAL_AMFLAGS =-I m4 lib_LTLIBRARIES = libignite-common.la -libignite_common_la_SOURCES = $(COMMON_SRC) -libignite_common_la_LIBADD = -L$(JAVA_HOME)/jre/lib/amd64/server -libignite_common_la_LDFLAGS = $(LIB_LDFLAGS) -L/usr/local/lib -ljvm -version-info 0:0:0 -release $(PACKAGE_VERSION) - -pkgconfigdir = $(libdir)/pkgconfig -pkgconfig_DATA = ignite-common.pc +SUBDIRS = \ + include \ + os/linux/include + +AM_CPPFLAGS = \ + -I$(srcdir)/include \ + -I$(srcdir)/os/linux/include \ + -DIGNITE_IMPL + +AM_CXXFLAGS = \ + -Wall \ + -std=c++0x + +libignite_common_la_LDFLAGS = \ + -no-undefined \ + -L/usr/local/lib \ + -ldl \ + -version-info 0:0:0 \ + -release $(PACKAGE_VERSION) + +libignite_common_la_SOURCES = \ + os/linux/src/common/concurrent_os.cpp \ + os/linux/src/common/utils.cpp \ + src/common/concurrent.cpp \ + src/ignite_error.cpp \ + src/date.cpp \ + src/guid.cpp \ + src/timestamp.cpp + clean-local: $(RM) *.gcno *.gcda http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/common/configure.ac ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/configure.ac b/modules/platforms/cpp/common/configure.ac deleted file mode 100644 index b85fa3a..0000000 --- a/modules/platforms/cpp/common/configure.ac +++ /dev/null @@ -1,62 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# -*- Autoconf -*- -# Process this file with autoconf to produce a configure script. - -AC_PREREQ([2.69]) -AC_INIT([Apache Ignite JNI bridge for C++], [1.6.0.8653], [[email protected]], [ignite-common], [ignite.apache.org]) -AC_CONFIG_SRCDIR(src) - -AC_CANONICAL_SYSTEM -AC_CONFIG_MACRO_DIR([m4]) -AC_LANG([C++]) - -# Initialize automake -AM_INIT_AUTOMAKE([-Wall foreign subdir-objects]) -AC_CONFIG_HEADER(config.h) - -AM_PROG_AR - -# Checks for programs. -GXX="-g -O2" - -AC_PROG_CXX - -# Initialize Libtool -LT_INIT - -# Checks for libraries. -AC_CHECK_LIB([pthread], [pthread_mutex_lock]) - -# Checks for header files. - -# Checks for typedefs, structures, and compiler characteristics. -AC_C_INLINE -AC_TYPE_INT16_T -AC_TYPE_INT32_T -AC_TYPE_INT64_T -AC_TYPE_INT8_T -AC_TYPE_PID_T -AC_TYPE_SIZE_T - -# Checks for library functions. -AC_FUNC_ERROR_AT_LINE - -AC_CONFIG_FILES(Makefile include/Makefile os/linux/include/Makefile ignite-common.pc) - -AC_OUTPUT http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/common/ignite-common.pc.in ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/ignite-common.pc.in b/modules/platforms/cpp/common/ignite-common.pc.in deleted file mode 100644 index b8c40d2..0000000 --- a/modules/platforms/cpp/common/ignite-common.pc.in +++ /dev/null @@ -1,9 +0,0 @@ -prefix=@prefix@ -exec_prefix=@exec_prefix@ -libdir=@libdir@ -includedir=@includedir@ - -Name: ignite-common -Description: Apache Ignite JNI bridge for C++. -Version: @PACKAGE_VERSION@ -Libs: -L${libdir} -lignite-common http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/common/include/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/Makefile.am b/modules/platforms/cpp/common/include/Makefile.am index 0df9741..b18d6f8 100644 --- a/modules/platforms/cpp/common/include/Makefile.am +++ b/modules/platforms/cpp/common/include/Makefile.am @@ -15,12 +15,15 @@ ## limitations under the License. ## -ACLOCAL_AMFLAGS = "-Im4" +ACLOCAL_AMFLAGS =-I m4 -nobase_include_HEADERS = ignite/common/concurrent.h \ - ignite/common/java.h \ - ignite/common/exports.h \ - ignite/common/utils.h +nobase_include_HEADERS = \ + ignite/common/concurrent.h \ + ignite/common/utils.h \ + ignite/ignite_error.h \ + ignite/date.h \ + ignite/guid.h \ + ignite/timestamp.h uninstall-hook: - find ${includedir}/ignite -type d -empty -delete + if [ -d ${includedir}/ignite ]; then find ${includedir}/ignite -type d -empty -delete; fi http://git-wip-us.apache.org/repos/asf/ignite/blob/764c97b9/modules/platforms/cpp/common/include/ignite/common/concurrent.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/concurrent.h b/modules/platforms/cpp/common/include/ignite/common/concurrent.h index 1c84cf5..bc2d943 100644 --- a/modules/platforms/cpp/common/include/ignite/common/concurrent.h +++ b/modules/platforms/cpp/common/include/ignite/common/concurrent.h @@ -354,4 +354,4 @@ namespace ignite } } -#endif \ No newline at end of file +#endif //_IGNITE_COMMON_CONCURRENT \ No newline at end of file
