http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp new file mode 100644 index 0000000..d1800b3 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp @@ -0,0 +1,185 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/cache/query/query_impl.h" + +using namespace ignite::common::concurrent; +using namespace ignite::common::java; +using namespace gridgain::impl::interop; +using namespace gridgain::impl::portable; + +namespace gridgain +{ + namespace impl + { + namespace cache + { + namespace query + { + /** Operation: get all entries. */ + const int32_t OP_GET_ALL = 1; + + /** Operation: get single entry. */ + const int32_t OP_GET_SINGLE = 3; + + QueryCursorImpl::QueryCursorImpl(SharedPointer<GridEnvironment> env, jobject javaRef) : + env(env), javaRef(javaRef), iterCalled(false), getAllCalled(false), hasNext(false) + { + // No-op. + } + + QueryCursorImpl::~QueryCursorImpl() + { + // 1. Close the cursor. + env.Get()->Context()->QueryCursorClose(javaRef); + + // 2. Release Java reference. + JniContext::Release(javaRef); + } + + bool QueryCursorImpl::HasNext(GridError* err) + { + // Check whether GetAll() was called earlier. + if (getAllCalled) + { + *err = GridError(GridError::GG_ERR_GENERIC, + "Cannot use HasNext() method because GetAll() was called."); + + return false; + } + + // Create iterator in Java if needed. + if (!CreateIteratorIfNeeded(err)) + return false; + + return hasNext; + } + + void QueryCursorImpl::GetNext(OutputOperation& op, GridError* err) + { + // Check whether GetAll() was called earlier. + if (getAllCalled) + { + *err = GridError(GridError::GG_ERR_GENERIC, + "Cannot use GetNext() method because GetAll() was called."); + + return; + } + + // Create iterator in Java if needed. + if (!CreateIteratorIfNeeded(err)) + return; + + if (hasNext) + { + JniErrorInfo jniErr; + + SharedPointer<InteropMemory> inMem = env.Get()->AllocateMemory(); + + env.Get()->Context()->TargetOutStream(javaRef, OP_GET_SINGLE, inMem.Get()->PointerLong(), &jniErr); + + GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + { + InteropInputStream in(inMem.Get()); + + portable::PortableReaderImpl reader(&in); + + op.ProcessOutput(reader); + + hasNext = IteratorHasNext(err); + } + } + else + { + // Ensure we do not overwrite possible previous error. + if (err->GetCode() == GridError::GG_SUCCESS) + *err = GridError(GridError::GG_ERR_GENERIC, "No more elements available."); + } + } + + void QueryCursorImpl::GetAll(OutputOperation& op, GridError* err) + { + // Check whether any of iterator methods were called. + if (iterCalled) + { + *err = GridError(GridError::GG_ERR_GENERIC, + "Cannot use GetAll() method because an iteration method was called."); + + return; + } + + // Check whether GetAll was called before. + if (getAllCalled) + { + *err = GridError(GridError::GG_ERR_GENERIC, + "Cannot use GetNext() method because GetAll() was called."); + + return; + } + + // Get data. + JniErrorInfo jniErr; + + SharedPointer<InteropMemory> inMem = env.Get()->AllocateMemory(); + + env.Get()->Context()->TargetOutStream(javaRef, OP_GET_ALL, inMem.Get()->PointerLong(), &jniErr); + + GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + { + getAllCalled = true; + + InteropInputStream in(inMem.Get()); + + portable::PortableReaderImpl reader(&in); + + op.ProcessOutput(reader); + } + } + + bool QueryCursorImpl::CreateIteratorIfNeeded(GridError* err) + { + if (!iterCalled) + { + JniErrorInfo jniErr; + + env.Get()->Context()->QueryCursorIterator(javaRef, &jniErr); + + GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + { + iterCalled = true; + + hasNext = IteratorHasNext(err); + } + else + return false; + } + + return true; + } + + bool QueryCursorImpl::IteratorHasNext(GridError* err) + { + JniErrorInfo jniErr; + + bool res = env.Get()->Context()->QueryCursorIteratorHasNext(javaRef, &jniErr); + + GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + return jniErr.code == IGNITE_JNI_ERR_SUCCESS && res; + } + } + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp b/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp new file mode 100644 index 0000000..f0e243b --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/grid_environment.cpp @@ -0,0 +1,158 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/portable/portable_reader_impl.h" +#include "gridgain/impl/grid_environment.h" +#include "gridgain/portable/portable.h" + +using namespace ignite::common::concurrent; +using namespace ignite::common::java; +using namespace gridgain::impl::interop; +using namespace gridgain::impl::portable; +using namespace gridgain::portable; + +namespace gridgain +{ + namespace impl + { + /** + * OnStart callback. + * + * @param target Target environment. + * @param memPtr Memory pointer. + */ + void IGNITE_CALL OnStart(void* target, long long memPtr) + { + SharedPointer<GridEnvironment>* ptr = static_cast<SharedPointer<GridEnvironment>*>(target); + + ptr->Get()->OnStartCallback(memPtr); + } + + /** + * OnStop callback. + * + * @param target Target environment. + */ + void IGNITE_CALL OnStop(void* target) + { + SharedPointer<GridEnvironment>* ptr = static_cast<SharedPointer<GridEnvironment>*>(target); + + delete ptr; + } + + GridEnvironment::GridEnvironment() : ctx(SharedPointer<JniContext>()), latch(new SingleLatch), gridName(NULL), + metaMgr(new PortableMetadataManager()) + { + // No-op. + } + + GridEnvironment::~GridEnvironment() + { + delete latch; + + if (gridName) + delete gridName; + + delete metaMgr; + } + + JniHandlers GridEnvironment::GetJniHandlers(SharedPointer<GridEnvironment>* target) + { + JniHandlers hnds = JniHandlers(); + + hnds.target = target; + + hnds.onStart = OnStart; + hnds.onStop = OnStop; + + hnds.error = NULL; + + return hnds; + } + + void GridEnvironment::Initialize(SharedPointer<JniContext> ctx) + { + this->ctx = ctx; + + latch->CountDown(); + } + + char* GridEnvironment::GridName() + { + return gridName; + } + + JniContext* GridEnvironment::Context() + { + return ctx.Get(); + } + + SharedPointer<InteropMemory> GridEnvironment::AllocateMemory() + { + SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(1024)); + + return ptr; + } + + SharedPointer<InteropMemory> GridEnvironment::AllocateMemory(int32_t cap) + { + SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(cap)); + + return ptr; + } + + SharedPointer<InteropMemory> GridEnvironment::GetMemory(int64_t memPtr) + { + int8_t* memPtr0 = reinterpret_cast<int8_t*>(memPtr); + + int32_t flags = InteropMemory::Flags(memPtr0); + + if (InteropMemory::IsExternal(flags)) + { + SharedPointer<InteropMemory> ptr(new InteropExternalMemory(memPtr0)); + + return ptr; + } + else + { + SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(memPtr0)); + + return ptr; + } + } + + PortableMetadataManager* GridEnvironment::GetMetadataManager() + { + return metaMgr; + } + + void GridEnvironment::OnStartCallback(long long memPtr) + { + InteropExternalMemory mem(reinterpret_cast<int8_t*>(memPtr)); + InteropInputStream stream(&mem); + + PortableReaderImpl reader(&stream); + + int32_t gridNameLen = reader.ReadString(NULL, 0); + + if (gridNameLen >= 0) + { + gridName = new char[gridNameLen + 1]; + reader.ReadString(gridName, gridNameLen + 1); + } + else + gridName = NULL; + } + } +} + + + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp new file mode 100644 index 0000000..43222e1 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/grid_impl.cpp @@ -0,0 +1,34 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/grid_impl.h" + +using namespace ignite::common::concurrent; +using namespace ignite::common::java; + +namespace gridgain +{ + namespace impl + { + GridImpl::GridImpl(SharedPointer<GridEnvironment> env, jobject javaRef) : env(env), javaRef(javaRef) + { + // No-op. + } + + GridImpl::~GridImpl() + { + JniContext::Release(javaRef); + } + + char* GridImpl::GetName() + { + return env.Get()->GridName(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp b/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp new file mode 100644 index 0000000..a4eafc0 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp @@ -0,0 +1,226 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/handle_registry.h" + +using namespace ignite::common::concurrent; + +namespace gridgain +{ + namespace impl + { + HandleRegistryEntry::~HandleRegistryEntry() + { + // No-op. + } + + HandleRegistrySegment::HandleRegistrySegment() : + map(new std::map<int64_t, SharedPointer<HandleRegistryEntry>>()), mux(new CriticalSection()) + { + // No-op. + } + + HandleRegistrySegment::~HandleRegistrySegment() + { + delete map; + delete mux; + } + + SharedPointer<HandleRegistryEntry> HandleRegistrySegment::Get(int64_t hnd) + { + mux->Enter(); + + SharedPointer<HandleRegistryEntry> res = (*map)[hnd]; + + mux->Leave(); + + return res; + } + + void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<HandleRegistryEntry>& entry) + { + mux->Enter(); + + (*map)[hnd] = entry; + + mux->Leave(); + } + + void HandleRegistrySegment::Remove(int64_t hnd) + { + mux->Enter(); + + map->erase(hnd); + + mux->Leave(); + } + + void HandleRegistrySegment::Clear() + { + mux->Enter(); + + map->erase(map->begin(), map->end()); + + mux->Leave(); + } + + HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt) + { + this->fastCap = fastCap; + + fastCtr = 0; + + fast = new SharedPointer<HandleRegistryEntry>[fastCap]; + + for (int i = 0; i < fastCap; i++) + fast[i] = SharedPointer<HandleRegistryEntry>(); + + this->slowSegmentCnt = slowSegmentCnt; + + slowCtr = fastCap; + + slow = new HandleRegistrySegment*[slowSegmentCnt]; + + for (int i = 0; i < slowSegmentCnt; i++) + slow[i] = new HandleRegistrySegment(); + + closed = 0; + + Memory::Fence(); + } + + HandleRegistry::~HandleRegistry() + { + Close(); + + delete[] fast; + + for (int i = 0; i < slowSegmentCnt; i++) + delete slow[i]; + + delete[] slow; + } + + int64_t HandleRegistry::Allocate(const SharedPointer<HandleRegistryEntry>& target) + { + return Allocate0(target, false, false); + } + + int64_t HandleRegistry::AllocateCritical(const SharedPointer<HandleRegistryEntry>& target) + { + return Allocate0(target, true, false); + } + + int64_t HandleRegistry::AllocateSafe(const SharedPointer<HandleRegistryEntry>& target) + { + return Allocate0(target, false, true); + } + + int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<HandleRegistryEntry>& target) + { + return Allocate0(target, true, true); + } + + void HandleRegistry::Release(int64_t hnd) + { + if (hnd < fastCap) + fast[static_cast<int32_t>(hnd)] = SharedPointer<HandleRegistryEntry>(); + else + { + HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt); + + segment->Remove(hnd); + } + + Memory::Fence(); + } + + SharedPointer<HandleRegistryEntry> HandleRegistry::Get(int64_t hnd) + { + Memory::Fence(); + + if (hnd < fastCap) + return fast[static_cast<int32_t>(hnd)]; + else + { + HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt); + + return segment->Get(hnd); + } + } + + void HandleRegistry::Close() + { + if (Atomics::CompareAndSet32(&closed, 0, 1)) + { + // Cleanup fast-path handles. + for (int i = 0; i < fastCap; i++) + fast[i] = SharedPointer<HandleRegistryEntry>(); + + // Cleanup slow-path handles. + for (int i = 0; i < slowSegmentCnt; i++) + (*(slow + i))->Clear(); + } + } + + int64_t HandleRegistry::Allocate0(const SharedPointer<HandleRegistryEntry>& target, bool critical, bool safe) + { + // Check closed state. + Memory::Fence(); + + if (closed == 1) + return -1; + + // Try allocating entry on critical path. + if (critical) + { + if (fastCtr < fastCap) + { + int32_t fastIdx = Atomics::IncrementAndGet32(&fastCtr) - 1; + + if (fastIdx < fastCap) + { + fast[fastIdx] = target; + + // Double-check for closed state if safe mode is on. + Memory::Fence(); + + if (safe && closed == 1) + { + fast[fastIdx] = SharedPointer<HandleRegistryEntry>(); + + return -1; + } + else + return fastIdx; + } + } + } + + // Either allocating on slow-path, or fast-path can no longer accomodate more entries. + int64_t slowIdx = Atomics::IncrementAndGet64(&slowCtr) - 1; + + HandleRegistrySegment* segment = *(slow + slowIdx % slowSegmentCnt); + + segment->Put(slowIdx, target); + + // Double-check for closed state if safe mode is on. + Memory::Fence(); + + if (safe && closed == 1) + { + segment->Remove(slowIdx); + + return -1; + } + + return slowIdx; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp new file mode 100644 index 0000000..c166847 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp @@ -0,0 +1,207 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include <cstring> + +#include "gridgain/impl/interop/interop_input_stream.h" +#include "gridgain/grid_error.h" + +/** + * Common macro to read a single value. + */ +#define GG_INTEROP_IN_READ(type, len) { \ + EnsureEnoughData(len); \ + type res = *reinterpret_cast<type*>(data + pos); \ + Shift(len); \ + return res; \ +} + +/** + * Common macro to read an array. + */ +#define GG_INTEROP_IN_READ_ARRAY(len, shift) { \ + CopyAndShift(reinterpret_cast<int8_t*>(res), 0, len << shift); \ +} + +namespace gridgain +{ + namespace impl + { + namespace interop + { + union PortableInt32Float + { + int32_t i; + float f; + }; + + union PortableInt64Double + { + int64_t i; + double d; + }; + + InteropInputStream::InteropInputStream(InteropMemory* mem) + { + this->mem = mem; + + data = mem->Data(); + len = mem->Length(); + pos = 0; + } + + int8_t InteropInputStream::ReadInt8() + { + GG_INTEROP_IN_READ(int8_t, 1); + } + + void InteropInputStream::ReadInt8Array(int8_t* const res, const int32_t len) + { + GG_INTEROP_IN_READ_ARRAY(len, 0); + } + + bool InteropInputStream::ReadBool() + { + return ReadInt8() == 1; + } + + void InteropInputStream::ReadBoolArray(bool* const res, const int32_t len) + { + for (int i = 0; i < len; i++) + *(res + i) = ReadBool(); + } + + int16_t InteropInputStream::ReadInt16() + { + GG_INTEROP_IN_READ(int16_t, 2); + } + + void InteropInputStream::ReadInt16Array(int16_t* const res, const int32_t len) + { + GG_INTEROP_IN_READ_ARRAY(len, 1); + } + + uint16_t InteropInputStream::ReadUInt16() + { + GG_INTEROP_IN_READ(uint16_t, 2); + } + + void InteropInputStream::ReadUInt16Array(uint16_t* const res, const int32_t len) + { + GG_INTEROP_IN_READ_ARRAY(len, 1); + } + + int32_t InteropInputStream::ReadInt32() + { + GG_INTEROP_IN_READ(int32_t, 4); + } + + int32_t InteropInputStream::ReadInt32(int32_t pos) + { + int delta = pos + 4 - this->pos; + + if (delta > 0) + EnsureEnoughData(delta); + + return *reinterpret_cast<int32_t*>(data + pos); + } + + void InteropInputStream::ReadInt32Array(int32_t* const res, const int32_t len) + { + GG_INTEROP_IN_READ_ARRAY(len, 2); + } + + int64_t InteropInputStream::ReadInt64() + { + GG_INTEROP_IN_READ(int64_t, 8); + } + + void InteropInputStream::ReadInt64Array(int64_t* const res, const int32_t len) + { + GG_INTEROP_IN_READ_ARRAY(len, 3); + } + + float InteropInputStream::ReadFloat() + { + PortableInt32Float u; + + u.i = ReadInt32(); + + return u.f; + } + + void InteropInputStream::ReadFloatArray(float* const res, const int32_t len) + { + GG_INTEROP_IN_READ_ARRAY(len, 2); + } + + double InteropInputStream::ReadDouble() + { + PortableInt64Double u; + + u.i = ReadInt64(); + + return u.d; + } + + void InteropInputStream::ReadDoubleArray(double* const res, const int32_t len) + { + GG_INTEROP_IN_READ_ARRAY(len, 3); + } + + int32_t InteropInputStream::Remaining() + { + return len - pos; + } + + int32_t InteropInputStream::Position() + { + return pos; + } + + void InteropInputStream::Position(int32_t pos) + { + if (pos > len) { + GG_ERROR_FORMATTED_3(GridError::GG_ERR_MEMORY, "Requested input stream position is out of bounds", + "memPtr", mem->PointerLong(), "len", len, "pos", pos); + } + + this->pos = pos; + } + + void InteropInputStream::Synchronize() + { + data = mem->Data(); + len = mem->Length(); + } + + void InteropInputStream::EnsureEnoughData(int32_t cnt) + { + if (len - pos < cnt) { + GG_ERROR_FORMATTED_4(GridError::GG_ERR_MEMORY, "Not enough data in the stream", + "memPtr", mem->PointerLong(), "len", len, "pos", pos, "requested", cnt); + } + } + + void InteropInputStream::CopyAndShift(int8_t* dest, int32_t off, int32_t cnt) + { + EnsureEnoughData(cnt); + + memcpy(dest + off, data + pos, cnt); + + Shift(cnt); + } + + void InteropInputStream::Shift(int32_t cnt) + { + pos += cnt; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp new file mode 100644 index 0000000..0402894 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp @@ -0,0 +1,173 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ +#include <ignite/common/java.h> + +#include "gridgain/impl/interop/interop_memory.h" +#include "gridgain/grid_error.h" + +using namespace ignite::common::java; + +namespace gridgain +{ + namespace impl + { + namespace interop + { + int8_t* InteropMemory::Data(int8_t* memPtr) + { + return reinterpret_cast<int8_t*>(*reinterpret_cast<int64_t*>(memPtr)); + } + + void InteropMemory::Data(int8_t* memPtr, void* ptr) + { + *reinterpret_cast<int64_t*>(memPtr) = reinterpret_cast<int64_t>(ptr); + } + + int32_t InteropMemory::Capacity(int8_t* memPtr) + { + return *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_CAP); + } + + void InteropMemory::Capacity(int8_t* memPtr, int32_t val) + { + *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_CAP) = val; + } + + int32_t InteropMemory::Length(int8_t* memPtr) + { + return *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_LEN); + } + + void InteropMemory::Length(int8_t* memPtr, int32_t val) + { + *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_LEN) = val; + } + + int32_t InteropMemory::Flags(int8_t* memPtr) + { + return *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_FLAGS); + } + + void InteropMemory::Flags(int8_t* memPtr, int32_t val) + { + *reinterpret_cast<int32_t*>(memPtr + GG_MEM_HDR_OFF_FLAGS) = val; + } + + bool InteropMemory::IsExternal(int8_t* memPtr) + { + return IsExternal(Flags(memPtr)); + } + + bool InteropMemory::IsExternal(int32_t flags) + { + return (flags & GG_MEM_FLAG_EXT) != GG_MEM_FLAG_EXT; + } + + bool InteropMemory::IsPooled(int8_t* memPtr) + { + return IsPooled(Flags(memPtr)); + } + + bool InteropMemory::IsPooled(int32_t flags) + { + return (flags & GG_MEM_FLAG_POOLED) != 0; + } + + bool InteropMemory::IsAcquired(int8_t* memPtr) + { + return IsAcquired(Flags(memPtr)); + } + + bool InteropMemory::IsAcquired(int32_t flags) + { + return (flags & GG_MEM_FLAG_ACQUIRED) != 0; + } + + int8_t* InteropMemory::Pointer() + { + return memPtr; + } + + int64_t InteropMemory::PointerLong() + { + return reinterpret_cast<int64_t>(memPtr); + } + + int8_t* InteropMemory::Data() + { + return Data(memPtr); + } + + int32_t InteropMemory::Capacity() + { + return Capacity(memPtr); + } + + int32_t InteropMemory::Length() + { + return Length(memPtr); + } + + void InteropMemory::Length(int32_t val) + { + Length(memPtr, val); + } + + InteropUnpooledMemory::InteropUnpooledMemory(int32_t cap) + { + memPtr = static_cast<int8_t*>(malloc(GG_MEM_HDR_LEN)); + + Data(memPtr, malloc(cap)); + Capacity(memPtr, cap); + Length(memPtr, 0); + Flags(memPtr, GG_MEM_FLAG_EXT); + + owning = true; + } + + InteropUnpooledMemory::InteropUnpooledMemory(int8_t* memPtr) + { + this->memPtr = memPtr; + this->owning = false; + } + + InteropUnpooledMemory::~InteropUnpooledMemory() + { + if (owning) { + free(Data()); + free(memPtr); + } + } + + void InteropUnpooledMemory::Reallocate(int32_t cap) + { + int doubledCap = Capacity() << 1; + + if (doubledCap > cap) + cap = doubledCap; + + Data(memPtr, realloc(Data(memPtr), cap)); + Capacity(memPtr, cap); + } + + InteropExternalMemory::InteropExternalMemory(int8_t* memPtr) + { + this->memPtr = memPtr; + } + + void InteropExternalMemory::Reallocate(int32_t cap) + { + if (JniContext::Reallocate(reinterpret_cast<int64_t>(memPtr), cap) == -1) { + GG_ERROR_FORMATTED_2(GridError::GG_ERR_MEMORY, "Failed to reallocate external memory", + "memPtr", PointerLong(), "requestedCapacity", cap) + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp new file mode 100644 index 0000000..351f851 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp @@ -0,0 +1,207 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include <cstring> + +#include "gridgain/impl/interop/interop_output_stream.h" +#include "gridgain/grid_error.h" + +/** + * Common macro to write a single value. + */ +#define GG_INTEROP_OUT_WRITE(val, type, len) { \ + EnsureCapacity(pos + len); \ + *reinterpret_cast<type*>(data + pos) = val; \ + Shift(len); \ +} + +/** + * Common macro to write an array. + */ +#define GG_INTEROP_OUT_WRITE_ARRAY(val, len) { \ + CopyAndShift(reinterpret_cast<const int8_t*>(val), 0, len); \ +} + +namespace gridgain +{ + namespace impl + { + namespace interop + { + union PortableFloatInt32 + { + float f; + int32_t i; + }; + + union PortableDoubleInt64 + { + double d; + int64_t i; + }; + + InteropOutputStream::InteropOutputStream(InteropMemory* mem) + { + this->mem = mem; + + data = mem->Data(); + cap = mem->Capacity(); + pos = 0; + } + + void InteropOutputStream::WriteInt8(const int8_t val) + { + GG_INTEROP_OUT_WRITE(val, int8_t, 1); + } + + void InteropOutputStream::WriteInt8(const int8_t val, const int32_t pos) + { + EnsureCapacity(pos + 1); + + *(data + pos) = val; + } + + void InteropOutputStream::WriteInt8Array(const int8_t* val, const int32_t len) + { + GG_INTEROP_OUT_WRITE_ARRAY(val, len); + } + + void InteropOutputStream::WriteBool(const bool val) + { + WriteInt8(val ? 1 : 0); + } + + void InteropOutputStream::WriteBoolArray(const bool* val, const int32_t len) + { + for (int i = 0; i < len; i++) + WriteBool(*(val + i)); + } + + void InteropOutputStream::WriteInt16(const int16_t val) + { + GG_INTEROP_OUT_WRITE(val, int16_t, 2); + } + + void InteropOutputStream::WriteInt16Array(const int16_t* val, const int32_t len) + { + GG_INTEROP_OUT_WRITE_ARRAY(val, len << 1); + } + + void InteropOutputStream::WriteUInt16(const uint16_t val) + { + GG_INTEROP_OUT_WRITE(val, uint16_t, 2); + } + + void InteropOutputStream::WriteUInt16Array(const uint16_t* val, const int32_t len) + { + GG_INTEROP_OUT_WRITE_ARRAY(val, len << 1); + } + + void InteropOutputStream::WriteInt32(const int32_t val) + { + GG_INTEROP_OUT_WRITE(val, int32_t, 4); + } + + void InteropOutputStream::WriteInt32(const int32_t pos, const int32_t val) + { + EnsureCapacity(pos + 4); + + *reinterpret_cast<int32_t*>(data + pos) = val; + } + + void InteropOutputStream::WriteInt32Array(const int32_t* val, const int32_t len) + { + GG_INTEROP_OUT_WRITE_ARRAY(val, len << 2); + } + + void InteropOutputStream::WriteInt64(const int64_t val) + { + GG_INTEROP_OUT_WRITE(val, int64_t, 8); + } + + void InteropOutputStream::WriteInt64Array(const int64_t* val, const int32_t len) + { + GG_INTEROP_OUT_WRITE_ARRAY(val, len << 3); + } + + void InteropOutputStream::WriteFloat(const float val) + { + PortableFloatInt32 u; + + u.f = val; + + WriteInt32(u.i); + } + + void InteropOutputStream::WriteFloatArray(const float* val, const int32_t len) + { + for (int i = 0; i < len; i++) + WriteFloat(*(val + i)); + } + + void InteropOutputStream::WriteDouble(const double val) + { + PortableDoubleInt64 u; + + u.d = val; + + WriteInt64(u.i); + } + + void InteropOutputStream::WriteDoubleArray(const double* val, const int32_t len) + { + for (int i = 0; i < len; i++) + WriteDouble(*(val + i)); + } + + int32_t InteropOutputStream::Position() + { + return pos; + } + + void InteropOutputStream::Position(const int32_t val) + { + EnsureCapacity(val); + + pos = val; + } + + void InteropOutputStream::Synchronize() + { + mem->Length(pos); + } + + void InteropOutputStream::EnsureCapacity(int32_t reqCap) { + if (reqCap > cap) { + int newCap = cap << 1; + + if (newCap < reqCap) + newCap = reqCap; + + mem->Reallocate(newCap); + data = mem->Data(); + cap = newCap; + } + } + + void InteropOutputStream::Shift(int32_t cnt) { + pos += cnt; + } + + void InteropOutputStream::CopyAndShift(const int8_t* src, int32_t off, int32_t len) { + EnsureCapacity(pos + len); + + memcpy(data + pos, src + off, len); + + Shift(len); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp new file mode 100644 index 0000000..08dc65d --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp @@ -0,0 +1,70 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/portable/portable_metadata_handler.h" + +using namespace ignite::common::concurrent; + +namespace gridgain +{ + namespace impl + { + namespace portable + { + PortableMetadataHandler::PortableMetadataHandler(SPSnap snap) : snap(snap), fieldIds(NULL), fields(NULL) + { + // No-op. + } + + PortableMetadataHandler::~PortableMetadataHandler() + { + if (fieldIds) + delete fieldIds; + + if (fields) + delete fields; + } + + void PortableMetadataHandler::OnFieldWritten(int32_t fieldId, std::string fieldName, int32_t fieldTypeId) + { + if (!snap.Get() || !snap.Get()->ContainsFieldId(fieldId)) + { + if (!HasDifference()) + { + fieldIds = new std::set<int32_t>(); + fields = new std::map<std::string, int32_t>(); + } + + fieldIds->insert(fieldId); + (*fields)[fieldName] = fieldTypeId; + } + } + + SPSnap PortableMetadataHandler::GetSnapshot() + { + return snap; + } + + bool PortableMetadataHandler::HasDifference() + { + return fieldIds ? true : false; + } + + std::set<int32_t>* PortableMetadataHandler::GetFieldIds() + { + return fieldIds; + } + + std::map<std::string, int32_t>* PortableMetadataHandler::GetFields() + { + return fields; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp new file mode 100644 index 0000000..d1076a8 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp @@ -0,0 +1,193 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include <ignite/common/concurrent.h> + +#include "gridgain/impl/portable/portable_metadata_manager.h" + +using namespace ignite::common::concurrent; + +namespace gridgain +{ + namespace impl + { + namespace portable + { + PortableMetadataManager::PortableMetadataManager() : + snapshots(SharedPointer<std::map<int32_t, SPSnap>>(new std::map<int32_t, SPSnap>)), + pending(new std::vector<SPSnap>()), + cs(new CriticalSection()), + pendingVer(0), ver(0) + { + // No-op. + } + + PortableMetadataManager::~PortableMetadataManager() + { + pending->erase(pending->begin(), pending->end()); + + delete pending; + delete cs; + } + + SharedPointer<PortableMetadataHandler> PortableMetadataManager::GetHandler(int32_t typeId) + { + SharedPointer<std::map<int32_t, SPSnap>> snapshots0 = snapshots; + + SPSnap snapshot = (*snapshots0.Get())[typeId]; + + return SharedPointer<PortableMetadataHandler>(new PortableMetadataHandler(snapshot)); + } + + void PortableMetadataManager::SubmitHandler(std::string typeName, int32_t typeId, + PortableMetadataHandler* hnd) + { + Snap* snap = hnd->GetSnapshot().Get(); + + // If this is the very first write of a class or difference exists, + // we need to enqueue it for write. + if (!snap || hnd->HasDifference()) + { + std::set<int32_t>* newFieldIds = new std::set<int32_t>(); + std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>(); + + CopyFields(snap, newFieldIds, newFields); + + if (hnd->HasDifference()) + { + std::set<int32_t>* diffFieldIds = hnd->GetFieldIds(); + std::map<std::string, int32_t>* diffFields = hnd->GetFields(); + + for (std::set<int32_t>::iterator it = diffFieldIds->begin(); it != diffFieldIds->end(); ++it) + newFieldIds->insert(*it); + + for (std::map<std::string, int32_t>::iterator it = diffFields->begin(); it != diffFields->end(); ++it) + (*newFields)[it->first] = it->second; + } + + Snap* diffSnap = new Snap(typeName, typeId, newFieldIds, newFields); + + cs->Enter(); + + pending->push_back(SPSnap(diffSnap)); + + pendingVer++; + + cs->Leave(); + } + } + + int32_t PortableMetadataManager::GetVersion() + { + Memory::Fence(); + + return ver; + } + + bool PortableMetadataManager::IsUpdatedSince(int32_t oldVer) + { + Memory::Fence(); + + return pendingVer > oldVer; + } + + bool PortableMetadataManager::ProcessPendingUpdates(PortableMetadataUpdater* updater, GridError* err) + { + bool success = true; // Optimistically assume that all will be fine. + + cs->Enter(); + + for (std::vector<SPSnap>::iterator it = pending->begin(); it != pending->end(); ++it) + { + Snap* pendingSnap = (*it).Get(); + + if (updater->Update(pendingSnap, err)) + { + // Perform copy-on-write update of snapshot collection. + std::map<int32_t, SPSnap>* newSnapshots = new std::map<int32_t, SPSnap>(); + + bool snapshotFound = false; + + for (std::map<int32_t, SPSnap>::iterator snapIt = snapshots.Get()->begin(); + snapIt != snapshots.Get()->end(); ++snapIt) + { + int32_t curTypeId = snapIt->first; + Snap* curSnap = snapIt->second.Get(); + + if (pendingSnap->GetTypeId() == curTypeId) + { + // Have to create snapshot with updated fields. + std::set<int32_t>* newFieldIds = new std::set<int32_t>(); + std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>(); + + // Add old fields. + CopyFields(curSnap, newFieldIds, newFields); + + // Add new fields. + CopyFields(pendingSnap, newFieldIds, newFields); + + // Create new snapshot. + Snap* newSnap = new Snap(pendingSnap->GetTypeName(), pendingSnap->GetTypeId(), + newFieldIds, newFields); + + (*newSnapshots)[curTypeId] = SPSnap(newSnap); + + snapshotFound = true; + } + else + (*newSnapshots)[curTypeId] = snapIt->second; // Just transfer exising snapshot. + } + + // Handle situation when completely new snapshot is found. + if (!snapshotFound) + (*newSnapshots)[pendingSnap->GetTypeId()] = *it; + + snapshots = SharedPointer<std::map<int32_t, SPSnap>>(newSnapshots); + } + else + { + // Stop as we cannot move further. + success = false; + + break; + } + } + + if (success) + { + pending->erase(pending->begin(), pending->end()); + + ver = pendingVer; + } + + cs->Leave(); + + return success; + } + + void PortableMetadataManager::CopyFields(Snap* snap, std::set<int32_t>* fieldIds, + std::map<std::string, int32_t>* fields) + { + if (snap && snap->HasFields()) + { + std::set<int32_t>* snapFieldIds = snap->GetFieldIds(); + std::map<std::string, int32_t>* snapFields = snap->GetFields(); + + for (std::set<int32_t>::iterator oldIt = snapFieldIds->begin(); + oldIt != snapFieldIds->end(); ++oldIt) + fieldIds->insert(*oldIt); + + for (std::map<std::string, int32_t>::iterator newFieldsIt = snapFields->begin(); + newFieldsIt != snapFields->end(); ++newFieldsIt) + (*fields)[newFieldsIt->first] = newFieldsIt->second; + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp new file mode 100644 index 0000000..18cc2ce --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp @@ -0,0 +1,62 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/portable/portable_metadata_snapshot.h" + +namespace gridgain +{ + namespace impl + { + namespace portable + { + PortableMetadataSnapshot::PortableMetadataSnapshot(std::string typeName, int32_t typeId, + std::set<int32_t>* fieldIds, std::map<std::string, int32_t>* fields) : + typeName(typeName), typeId(typeId), fieldIds(fieldIds), fields(fields) + { + // No-op. + } + + PortableMetadataSnapshot::~PortableMetadataSnapshot() + { + delete fieldIds; + delete fields; + } + + bool PortableMetadataSnapshot::ContainsFieldId(int32_t fieldId) + { + return fieldIds && fieldIds->count(fieldId) == 1; + } + + std::string PortableMetadataSnapshot::GetTypeName() + { + return typeName; + } + + int32_t PortableMetadataSnapshot::GetTypeId() + { + return typeId; + } + + bool PortableMetadataSnapshot::HasFields() + { + return !fieldIds->empty(); + } + + std::set<int32_t>* PortableMetadataSnapshot::GetFieldIds() + { + return fieldIds; + } + + std::map<std::string, int32_t>* PortableMetadataSnapshot::GetFields() + { + return fields; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp new file mode 100644 index 0000000..3022b72 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp @@ -0,0 +1,24 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/portable/portable_metadata_updater.h" + +namespace gridgain +{ + namespace impl + { + namespace portable + { + PortableMetadataUpdater::~PortableMetadataUpdater() + { + // No-op. + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp new file mode 100644 index 0000000..111ccfd --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp @@ -0,0 +1,86 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/portable/portable_metadata_updater_impl.h" +#include <gridgain/impl/interop/interop_output_stream.h> +#include <gridgain/impl/portable/portable_writer_impl.h> +#include <gridgain/portable/portable_raw_writer.h> + +using namespace ignite::common::concurrent; +using namespace ignite::common::java; +using namespace gridgain::impl; +using namespace gridgain::impl::interop; +using namespace gridgain::portable; + +namespace gridgain +{ + namespace impl + { + namespace portable + { + /** Operation: Clear. */ + const int32_t OP_METADATA = -1; + + PortableMetadataUpdaterImpl::PortableMetadataUpdaterImpl(SharedPointer<GridEnvironment> env, + jobject javaRef) : env(env), javaRef(javaRef) + { + // No-op. + } + + PortableMetadataUpdaterImpl::~PortableMetadataUpdaterImpl() + { + // No-op. + } + + bool PortableMetadataUpdaterImpl::Update(Snap* snap, GridError* err) + { + JniErrorInfo jniErr; + + SharedPointer<InteropMemory> mem = env.Get()->AllocateMemory(); + + InteropOutputStream out(mem.Get()); + PortableWriterImpl writer(&out, NULL); + PortableRawWriter rawWriter(&writer); + + // We always pass only one meta at a time in current implementation for simplicity. + rawWriter.WriteInt32(1); + + rawWriter.WriteInt32(snap->GetTypeId()); + rawWriter.WriteString(snap->GetTypeName()); + rawWriter.WriteString(NULL); // Affinity key is not supported for now. + + if (snap->HasFields()) + { + std::map<std::string, int32_t>* fields = snap->GetFields(); + + rawWriter.WriteInt32(static_cast<int32_t>(fields->size())); + + for (std::map<std::string, int32_t>::iterator it = fields->begin(); it != fields->end(); ++it) + { + rawWriter.WriteString(it->first); + rawWriter.WriteInt32(it->second); + } + } + else + rawWriter.WriteInt32(0); + + out.Synchronize(); + + long long res = env.Get()->Context()->TargetInStreamOutLong(javaRef, OP_METADATA, mem.Get()->PointerLong(), &jniErr); + + GridError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + return res == 1; + else + return false; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1e18fa32/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp new file mode 100644 index 0000000..03df9e4 --- /dev/null +++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_reader_impl.cpp @@ -0,0 +1,675 @@ +/* + * Copyright (C) GridGain Systems. All Rights Reserved. + * _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +#include "gridgain/impl/interop/interop.h" +#include "gridgain/impl/portable/portable_common.h" +#include "gridgain/impl/portable/portable_id_resolver.h" +#include "gridgain/impl/portable/portable_reader_impl.h" +#include "gridgain/impl/portable/portable_utils.h" +#include "gridgain/portable/portable_type.h" +#include "gridgain/grid_error.h" + +using namespace gridgain::impl::interop; +using namespace gridgain::impl::portable; +using namespace gridgain::portable; + +namespace gridgain +{ + namespace impl + { + namespace portable + { + PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream, PortableIdResolver* idRslvr, + int32_t pos, bool usrType, int32_t typeId, int32_t hashCode, int32_t len, int32_t rawOff) : + stream(stream), idRslvr(idRslvr), pos(pos), usrType(usrType), typeId(typeId), + hashCode(hashCode), len(len), rawOff(rawOff), rawMode(false), + elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0) + { + // No-op. + } + + PortableReaderImpl::PortableReaderImpl(InteropInputStream* stream) : + stream(stream), idRslvr(NULL), pos(0), usrType(false), typeId(0), hashCode(0), + len(0), rawOff(0), rawMode(true), + elemIdGen(0), elemId(0), elemCnt(-1), elemRead(0) + { + // No-op. + } + + int8_t PortableReaderImpl::ReadInt8() + { + return ReadRaw<int8_t>(PortableUtils::ReadInt8); + } + + int32_t PortableReaderImpl::ReadInt8Array(int8_t* res, const int32_t len) + { + return ReadRawArray<int8_t>(res, len, PortableUtils::ReadInt8Array, GG_TYPE_ARRAY_BYTE); + } + + int8_t PortableReaderImpl::ReadInt8(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadInt8, GG_TYPE_BYTE, static_cast<int8_t>(0)); + } + + int32_t PortableReaderImpl::ReadInt8Array(const char* fieldName, int8_t* res, const int32_t len) + { + return ReadArray<int8_t>(fieldName, res, len,PortableUtils::ReadInt8Array, GG_TYPE_ARRAY_BYTE); + } + + bool PortableReaderImpl::ReadBool() + { + return ReadRaw<bool>(PortableUtils::ReadBool); + } + + int32_t PortableReaderImpl::ReadBoolArray(bool* res, const int32_t len) + { + return ReadRawArray<bool>(res, len, PortableUtils::ReadBoolArray, GG_TYPE_ARRAY_BOOL); + } + + bool PortableReaderImpl::ReadBool(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadBool, GG_TYPE_BOOL, static_cast<bool>(0)); + } + + int32_t PortableReaderImpl::ReadBoolArray(const char* fieldName, bool* res, const int32_t len) + { + return ReadArray<bool>(fieldName, res, len,PortableUtils::ReadBoolArray, GG_TYPE_ARRAY_BOOL); + } + + int16_t PortableReaderImpl::ReadInt16() + { + return ReadRaw<int16_t>(PortableUtils::ReadInt16); + } + + int32_t PortableReaderImpl::ReadInt16Array(int16_t* res, const int32_t len) + { + return ReadRawArray<int16_t>(res, len, PortableUtils::ReadInt16Array, GG_TYPE_ARRAY_SHORT); + } + + int16_t PortableReaderImpl::ReadInt16(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadInt16, GG_TYPE_SHORT, static_cast<int16_t>(0)); + } + + int32_t PortableReaderImpl::ReadInt16Array(const char* fieldName, int16_t* res, const int32_t len) + { + return ReadArray<int16_t>(fieldName, res, len, PortableUtils::ReadInt16Array, GG_TYPE_ARRAY_SHORT); + } + + uint16_t PortableReaderImpl::ReadUInt16() + { + return ReadRaw<uint16_t>(PortableUtils::ReadUInt16); + } + + int32_t PortableReaderImpl::ReadUInt16Array(uint16_t* res, const int32_t len) + { + return ReadRawArray<uint16_t>(res, len, PortableUtils::ReadUInt16Array, GG_TYPE_ARRAY_CHAR); + } + + uint16_t PortableReaderImpl::ReadUInt16(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadUInt16, GG_TYPE_CHAR, static_cast<uint16_t>(0)); + } + + int32_t PortableReaderImpl::ReadUInt16Array(const char* fieldName, uint16_t* res, const int32_t len) + { + return ReadArray<uint16_t>(fieldName, res, len,PortableUtils::ReadUInt16Array, GG_TYPE_ARRAY_CHAR); + } + + int32_t PortableReaderImpl::ReadInt32() + { + return ReadRaw<int32_t>(PortableUtils::ReadInt32); + } + + int32_t PortableReaderImpl::ReadInt32Array(int32_t* res, const int32_t len) + { + return ReadRawArray<int32_t>(res, len, PortableUtils::ReadInt32Array, GG_TYPE_ARRAY_INT); + } + + int32_t PortableReaderImpl::ReadInt32(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadInt32, GG_TYPE_INT, static_cast<int32_t>(0)); + } + + int32_t PortableReaderImpl::ReadInt32Array(const char* fieldName, int32_t* res, const int32_t len) + { + return ReadArray<int32_t>(fieldName, res, len,PortableUtils::ReadInt32Array, GG_TYPE_ARRAY_INT); + } + + int64_t PortableReaderImpl::ReadInt64() + { + return ReadRaw<int64_t>(PortableUtils::ReadInt64); + } + + int32_t PortableReaderImpl::ReadInt64Array(int64_t* res, const int32_t len) + { + return ReadRawArray<int64_t>(res, len, PortableUtils::ReadInt64Array, GG_TYPE_ARRAY_LONG); + } + + int64_t PortableReaderImpl::ReadInt64(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadInt64, GG_TYPE_LONG, static_cast<int64_t>(0)); + } + + int32_t PortableReaderImpl::ReadInt64Array(const char* fieldName, int64_t* res, const int32_t len) + { + return ReadArray<int64_t>(fieldName, res, len,PortableUtils::ReadInt64Array, GG_TYPE_ARRAY_LONG); + } + + float PortableReaderImpl::ReadFloat() + { + return ReadRaw<float>(PortableUtils::ReadFloat); + } + + int32_t PortableReaderImpl::ReadFloatArray(float* res, const int32_t len) + { + return ReadRawArray<float>(res, len, PortableUtils::ReadFloatArray, GG_TYPE_ARRAY_FLOAT); + } + + float PortableReaderImpl::ReadFloat(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadFloat, GG_TYPE_FLOAT, static_cast<float>(0)); + } + + int32_t PortableReaderImpl::ReadFloatArray(const char* fieldName, float* res, const int32_t len) + { + return ReadArray<float>(fieldName, res, len,PortableUtils::ReadFloatArray, GG_TYPE_ARRAY_FLOAT); + } + + double PortableReaderImpl::ReadDouble() + { + return ReadRaw<double>(PortableUtils::ReadDouble); + } + + int32_t PortableReaderImpl::ReadDoubleArray(double* res, const int32_t len) + { + return ReadRawArray<double>(res, len, PortableUtils::ReadDoubleArray, GG_TYPE_ARRAY_DOUBLE); + } + + double PortableReaderImpl::ReadDouble(const char* fieldName) + { + return Read(fieldName, PortableUtils::ReadDouble, GG_TYPE_DOUBLE, static_cast<double>(0)); + } + + int32_t PortableReaderImpl::ReadDoubleArray(const char* fieldName, double* res, const int32_t len) + { + return ReadArray<double>(fieldName, res, len,PortableUtils::ReadDoubleArray, GG_TYPE_ARRAY_DOUBLE); + } + + Guid PortableReaderImpl::ReadGuid() + { + CheckRawMode(true); + CheckSingleMode(true); + + return ReadNullable(stream, PortableUtils::ReadGuid, GG_TYPE_UUID); + } + + int32_t PortableReaderImpl::ReadGuidArray(Guid* res, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + return ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, GG_TYPE_ARRAY_UUID); + } + + Guid PortableReaderImpl::ReadGuid(const char* fieldName) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldLen = SeekField(fieldId); + + if (fieldLen > 0) + return ReadNullable(stream, PortableUtils::ReadGuid, GG_TYPE_UUID); + + return Guid(); + } + + int32_t PortableReaderImpl::ReadGuidArray(const char* fieldName, Guid* res, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t pos = stream->Position(); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldLen = SeekField(fieldId); + + if (fieldLen > 0) { + int32_t realLen = ReadArrayInternal<Guid>(res, len, stream, ReadGuidArrayInternal, GG_TYPE_ARRAY_UUID); + + // If actual read didn't occur return to initial position so that we do not perform + // N jumps to find the field again, where N is total amount of fields. + if (realLen != -1 && (!res || realLen > len)) + stream->Position(pos); + + return realLen; + } + + return -1; + } + + void PortableReaderImpl::ReadGuidArrayInternal(InteropInputStream* stream, Guid* res, const int32_t len) + { + for (int i = 0; i < len; i++) + *(res + i) = ReadNullable<Guid>(stream, PortableUtils::ReadGuid, GG_TYPE_UUID); + } + + int32_t PortableReaderImpl::ReadString(char* res, const int32_t len) + { + CheckRawMode(true); + CheckSingleMode(true); + + return ReadStringInternal(res, len); + } + + int32_t PortableReaderImpl::ReadString(const char* fieldName, char* res, const int32_t len) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t pos = stream->Position(); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldLen = SeekField(fieldId); + + if (fieldLen > 0) { + int32_t realLen = ReadStringInternal(res, len); + + // If actual read didn't occur return to initial position so that we do not perform + // N jumps to find the field again, where N is total amount of fields. + if (realLen != -1 && (!res || realLen > len)) + stream->Position(pos); + + return realLen; + } + + return -1; + } + + int32_t PortableReaderImpl::ReadStringArray(int32_t* size) + { + return StartContainerSession(true, GG_TYPE_ARRAY_STRING, size); + } + + int32_t PortableReaderImpl::ReadStringArray(const char* fieldName, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldLen = SeekField(fieldId); + + if (fieldLen > 0) + return StartContainerSession(false, GG_TYPE_ARRAY_STRING, size); + else { + *size = -1; + + return ++elemIdGen; + } + } + + int32_t PortableReaderImpl::ReadStringElement(int32_t id, char* res, const int32_t len) + { + CheckSession(id); + + int32_t posBefore = stream->Position(); + + int32_t realLen = ReadStringInternal(res, len); + + int32_t posAfter = stream->Position(); + + if (posAfter > posBefore && ++elemRead == elemCnt) { + elemId = 0; + elemCnt = -1; + elemRead = 0; + } + + return realLen; + } + + int32_t PortableReaderImpl::ReadStringInternal(char* res, const int32_t len) + { + int8_t hdr = stream->ReadInt8(); + + if (hdr == GG_TYPE_STRING) { + bool utf8Mode = stream->ReadBool(); + int32_t realLen = stream->ReadInt32(); + + if (res && len >= realLen) { + if (utf8Mode) + { + for (int i = 0; i < realLen; i++) + *(res + i) = static_cast<char>(stream->ReadInt8()); + } + else + { + for (int i = 0; i < realLen; i++) + *(res + i) = static_cast<char>(stream->ReadUInt16()); + } + + if (len > realLen) + *(res + realLen) = 0; // Set NULL terminator if possible. + } + else + stream->Position(stream->Position() - 6); + + return realLen; + } + else if (hdr != GG_HDR_NULL) + ThrowOnInvalidHeader(GG_TYPE_ARRAY, hdr); + + return -1; + } + + int32_t PortableReaderImpl::ReadArray(int32_t* size) + { + return StartContainerSession(true, GG_TYPE_ARRAY, size); + } + + int32_t PortableReaderImpl::ReadArray(const char* fieldName, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldLen = SeekField(fieldId); + + if (fieldLen > 0) + return StartContainerSession(false, GG_TYPE_ARRAY, size); + else { + *size = -1; + + return ++elemIdGen; + } + } + + int32_t PortableReaderImpl::ReadCollection(CollectionType* typ, int32_t* size) + { + int32_t id = StartContainerSession(true, GG_TYPE_COLLECTION, size); + + if (*size == -1) + *typ = GG_COLLECTION_UNDEFINED; + else + *typ = static_cast<CollectionType>(stream->ReadInt8()); + + return id; + } + + int32_t PortableReaderImpl::ReadCollection(const char* fieldName, CollectionType* typ, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldLen = SeekField(fieldId); + + if (fieldLen > 0) + { + int32_t id = StartContainerSession(false, GG_TYPE_COLLECTION, size); + + if (*size == -1) + *typ = GG_COLLECTION_UNDEFINED; + else + *typ = static_cast<CollectionType>(stream->ReadInt8()); + + return id; + } + else { + *typ = GG_COLLECTION_UNDEFINED; + *size = -1; + + return ++elemIdGen; + } + } + + int32_t PortableReaderImpl::ReadMap(MapType* typ, int32_t* size) + { + int32_t id = StartContainerSession(true, GG_TYPE_MAP, size); + + if (*size == -1) + *typ = GG_MAP_UNDEFINED; + else + *typ = static_cast<MapType>(stream->ReadInt8()); + + return id; + } + + int32_t PortableReaderImpl::ReadMap(const char* fieldName, MapType* typ, int32_t* size) + { + CheckRawMode(false); + CheckSingleMode(true); + + int32_t fieldId = idRslvr->GetFieldId(typeId, fieldName); + int32_t fieldLen = SeekField(fieldId); + + if (fieldLen > 0) + { + int32_t id = StartContainerSession(false, GG_TYPE_MAP, size); + + if (*size == -1) + *typ = GG_MAP_UNDEFINED; + else + *typ = static_cast<MapType>(stream->ReadInt8()); + + return id; + } + else { + *typ = GG_MAP_UNDEFINED; + *size = -1; + + return ++elemIdGen; + } + } + + bool PortableReaderImpl::HasNextElement(int32_t id) + { + return elemId == id && elemRead < elemCnt; + } + + void PortableReaderImpl::SetRawMode() + { + CheckRawMode(false); + CheckSingleMode(true); + + stream->Position(pos + rawOff); + rawMode = true; + } + + template <> + int8_t PortableReaderImpl::ReadTopObject<int8_t>() + { + return ReadTopObject0(GG_TYPE_BYTE, PortableUtils::ReadInt8, static_cast<int8_t>(0)); + } + + template <> + bool PortableReaderImpl::ReadTopObject<bool>() + { + return ReadTopObject0(GG_TYPE_BOOL, PortableUtils::ReadBool, static_cast<bool>(0)); + } + + template <> + int16_t PortableReaderImpl::ReadTopObject<int16_t>() + { + return ReadTopObject0(GG_TYPE_SHORT, PortableUtils::ReadInt16, static_cast<int16_t>(0)); + } + + template <> + uint16_t PortableReaderImpl::ReadTopObject<uint16_t>() + { + return ReadTopObject0(GG_TYPE_CHAR, PortableUtils::ReadUInt16, static_cast<uint16_t>(0)); + } + + template <> + int32_t PortableReaderImpl::ReadTopObject<int32_t>() + { + return ReadTopObject0(GG_TYPE_INT, PortableUtils::ReadInt32, static_cast<int32_t>(0)); + } + + template <> + int64_t PortableReaderImpl::ReadTopObject<int64_t>() + { + return ReadTopObject0(GG_TYPE_LONG, PortableUtils::ReadInt64, static_cast<int64_t>(0)); + } + + template <> + float PortableReaderImpl::ReadTopObject<float>() + { + return ReadTopObject0(GG_TYPE_FLOAT, PortableUtils::ReadFloat, static_cast<float>(0)); + } + + template <> + double PortableReaderImpl::ReadTopObject<double>() + { + return ReadTopObject0(GG_TYPE_DOUBLE, PortableUtils::ReadDouble, static_cast<double>(0)); + } + + template <> + Guid PortableReaderImpl::ReadTopObject<Guid>() + { + int8_t typeId = stream->ReadInt8(); + + if (typeId == GG_TYPE_UUID) + return PortableUtils::ReadGuid(stream); + else if (typeId == GG_HDR_NULL) + return Guid(); + else { + int32_t pos = stream->Position() - 1; + + GG_ERROR_FORMATTED_3(GridError::GG_ERR_PORTABLE, "Invalid header", "position", pos, "expected", GG_TYPE_UUID, "actual", typeId) + } + } + + InteropInputStream* PortableReaderImpl::GetStream() + { + return stream; + } + + int32_t PortableReaderImpl::SeekField(const int32_t fieldId) + { + // We assume that it is very likely that fields are read in the same + // order as they were initially written. So we start seeking field + // from current stream position making a "loop" up to this position. + int32_t marker = stream->Position(); + + for (int32_t curPos = marker; curPos < pos + rawOff;) + { + int32_t curFieldId = stream->ReadInt32(); + int32_t curFieldLen = stream->ReadInt32(); + + if (fieldId == curFieldId) + return curFieldLen; + else { + curPos = stream->Position() + curFieldLen; + + stream->Position(curPos); + } + } + + stream->Position(pos + 18); + + for (int32_t curPos = stream->Position(); curPos < marker;) + { + int32_t curFieldId = stream->ReadInt32(); + int32_t curFieldLen = stream->ReadInt32(); + + if (fieldId == curFieldId) + return curFieldLen; + else { + curPos = stream->Position() + curFieldLen; + + stream->Position(curPos); + } + } + + return -1; + } + + void PortableReaderImpl::CheckRawMode(bool expected) + { + if (expected && !rawMode) { + GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation can be performed only in raw mode.") + } + else if (!expected && rawMode) { + GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation cannot be performed in raw mode.") + } + } + + void PortableReaderImpl::CheckSingleMode(bool expected) + { + if (expected && elemId != 0) { + GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation cannot be performed when container is being read."); + } + else if (!expected && elemId == 0) { + GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Operation can be performed only when container is being read."); + } + } + + int32_t PortableReaderImpl::StartContainerSession(bool expRawMode, int8_t expHdr, int32_t* size) + { + CheckRawMode(expRawMode); + CheckSingleMode(true); + + int8_t hdr = stream->ReadInt8(); + + if (hdr == expHdr) + { + int32_t cnt = stream->ReadInt32(); + + if (cnt != 0) + { + elemId = ++elemIdGen; + elemCnt = cnt; + elemRead = 0; + + *size = cnt; + + return elemId; + } + else + { + *size = 0; + + return ++elemIdGen; + } + } + else if (hdr == GG_HDR_NULL) { + *size = -1; + + return ++elemIdGen; + } + else { + ThrowOnInvalidHeader(expHdr, hdr); + + return 0; + } + } + + void PortableReaderImpl::CheckSession(int32_t expSes) + { + if (elemId != expSes) { + GG_ERROR_1(GridError::GG_ERR_PORTABLE, "Containter read session has been finished or is not started yet."); + } + } + + void PortableReaderImpl::ThrowOnInvalidHeader(int32_t pos, int8_t expHdr, int8_t hdr) + { + GG_ERROR_FORMATTED_3(GridError::GG_ERR_PORTABLE, "Invalid header", "position", pos, "expected", expHdr, "actual", hdr) + } + + void PortableReaderImpl::ThrowOnInvalidHeader(int8_t expHdr, int8_t hdr) + { + int32_t pos = stream->Position() - 1; + + ThrowOnInvalidHeader(pos, expHdr, hdr); + } + } + } +} \ No newline at end of file
