Repository: ignite Updated Branches: refs/heads/master 79bac4f87 -> d4da92b7a
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp index dfef8e4..858ee77 100644 --- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp @@ -17,8 +17,11 @@ #include <ignite/common/utils.h> -#include "ignite/impl/cache/cache_impl.h" -#include "ignite/impl/binary/binary_type_updater_impl.h" +#include <ignite/impl/cache/cache_impl.h> +#include <ignite/impl/binary/binary_type_updater_impl.h> +#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h> + +#include <ignite/cache/query/continuous/continuous_query_handle.h> using namespace ignite::common::concurrent; using namespace ignite::jni::java; @@ -381,14 +384,93 @@ namespace ignite IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); } - struct DummyQry { void Write(BinaryRawWriter&) const { }}; + struct Dummy + { + void Write(BinaryRawWriter&) const + { + // No-op. + } + }; ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry, IgniteError& err) { - DummyQry dummy; + Dummy dummy; return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err); } + + template <typename T> + QueryCursorImpl* CacheImpl::QueryInternal(const T& qry, int32_t typ, IgniteError& err) + { + JniErrorInfo jniErr; + + SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory(); + InteropMemory* mem0 = mem.Get(); + InteropOutputStream out(mem0); + BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + BinaryRawWriter rawWriter(&writer); + + qry.Write(rawWriter); + + out.Synchronize(); + + jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(), + typ, mem.Get()->PointerLong(), &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + return new QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef); + + return 0; + } + + template <typename T> + ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry, + const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err) + { + JniErrorInfo jniErr; + + SharedPointer<InteropMemory> mem = GetEnvironment().AllocateMemory(); + InteropMemory* mem0 = mem.Get(); + InteropOutputStream out(mem0); + BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + BinaryRawWriter rawWriter(&writer); + + const ContinuousQueryImplBase& qry0 = *qry.Get(); + + int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry); + + rawWriter.WriteInt64(handle); + rawWriter.WriteBool(qry0.GetLocal()); + + event::CacheEntryEventFilterHolderBase& filterOp = qry0.GetFilterHolder(); + + filterOp.Write(writer); + + rawWriter.WriteInt32(qry0.GetBufferSize()); + rawWriter.WriteInt64(qry0.GetTimeInterval()); + + // Autounsubscribe is a filter feature. + rawWriter.WriteBool(false); + + // Writing initial query. When there is not initial query writing -1. + rawWriter.WriteInt32(typ); + if (typ != -1) + initialQry.Write(rawWriter); + + out.Synchronize(); + + jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(), + cmd, mem.Get()->PointerLong(), &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + if (jniErr.code == IGNITE_JNI_ERR_SUCCESS) + return new ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef); + + return 0; + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp index b2fa1fd..b15183b 100644 --- a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp @@ -84,11 +84,6 @@ namespace ignite return new QueryCursorImpl(env, res); } - - void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query) - { - qry = query; - } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp new file mode 100644 index 0000000..2e09de2 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/ignite_binding_impl.cpp @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/impl/ignite_environment.h> +#include <ignite/impl/ignite_binding_impl.h> + +using namespace ignite::common::concurrent; + +namespace ignite +{ + namespace impl + { + IgniteBindingImpl::IgniteBindingImpl(IgniteEnvironment &env) : + env(env), + callbacks() + { + // No-op. + } + + int64_t IgniteBindingImpl::InvokeCallback(bool& found, int32_t type, int32_t id, + binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer) + { + int64_t key = makeKey(type, id); + + CsLockGuard guard(lock); + + std::map<int64_t, Callback*>::iterator it = callbacks.find(key); + + found = it != callbacks.end(); + + if (found) + { + Callback* callback = it->second; + + // We have found callback and does not need lock here anymore. + guard.Reset(); + + return callback(reader, writer, env); + } + + return 0; + } + + void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* proc, IgniteError& err) + { + int64_t key = makeKey(type, id); + + CsLockGuard guard(lock); + + bool inserted = callbacks.insert(std::make_pair(key, proc)).second; + + guard.Reset(); + + if (!inserted) + { + std::stringstream builder; + + builder << "Trying to register multiple PRC callbacks with the same ID. [type=" + << type << ", id=" << id << ']'; + + err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str()); + } + } + + void IgniteBindingImpl::RegisterCallback(int32_t type, int32_t id, Callback* callback) + { + IgniteError err; + + RegisterCallback(type, id, callback, err); + + IgniteError::ThrowIfNeeded(err); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_environment.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp index b37fa8f..4e2a1f2 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp @@ -15,14 +15,18 @@ * limitations under the License. */ -#include "ignite/impl/interop/interop_external_memory.h" -#include "ignite/impl/binary/binary_reader_impl.h" -#include "ignite/impl/ignite_environment.h" -#include "ignite/cache/query/continuous/continuous_query.h" -#include "ignite/binary/binary.h" -#include "ignite/impl/binary/binary_type_updater_impl.h" -#include "ignite/impl/module_manager.h" -#include "ignite/ignite_binding.h" +#include <ignite/impl/interop/interop_external_memory.h> +#include <ignite/impl/binary/binary_reader_impl.h> +#include <ignite/impl/binary/binary_type_updater_impl.h> +#include <ignite/impl/module_manager.h> +#include <ignite/impl/ignite_binding_impl.h> + +#include <ignite/binary/binary.h> +#include <ignite/cache/query/continuous/continuous_query.h> +#include <ignite/ignite_binding.h> +#include <ignite/ignite_binding_context.h> + +#include <ignite/impl/ignite_environment.h> using namespace ignite::common::concurrent; using namespace ignite::jni::java; @@ -42,6 +46,8 @@ namespace ignite { CACHE_INVOKE = 8, CONTINUOUS_QUERY_LISTENER_APPLY = 18, + CONTINUOUS_QUERY_FILTER_CREATE = 19, + CONTINUOUS_QUERY_FILTER_APPLY = 20, CONTINUOUS_QUERY_FILTER_RELEASE = 21, REALLOC = 36, ON_START = 49, @@ -57,6 +63,7 @@ namespace ignite */ long long IGNITE_CALL InLongOutLong(void* target, int type, long long val) { + int64_t res = 0; SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target); switch (type) @@ -77,6 +84,24 @@ namespace ignite break; } + case CONTINUOUS_QUERY_FILTER_CREATE: + { + SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val); + + res = env->Get()->OnContinuousQueryFilterCreate(mem); + + break; + } + + case CONTINUOUS_QUERY_FILTER_APPLY: + { + SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val); + + res = env->Get()->OnContinuousQueryFilterApply(mem); + + break; + } + case CONTINUOUS_QUERY_FILTER_RELEASE: { // No-op. @@ -98,7 +123,7 @@ namespace ignite } } - return 0; + return res; } /** @@ -152,10 +177,14 @@ namespace ignite registry(DEFAULT_FAST_PATH_CONTAINERS_CAP, DEFAULT_SLOW_PATH_CONTAINERS_CAP), metaMgr(new BinaryTypeManager()), metaUpdater(0), - binding(new IgniteBindingImpl()), - moduleMgr(new ModuleManager(GetBindingContext())) + binding(), + moduleMgr() { - // No-op. + binding = SharedPointer<IgniteBindingImpl>(new IgniteBindingImpl(*this)); + + IgniteBindingContext bindingContext(cfg, GetBinding()); + + moduleMgr = SharedPointer<ModuleManager>(new ModuleManager(bindingContext)); } IgniteEnvironment::~IgniteEnvironment() @@ -263,14 +292,9 @@ namespace ignite return metaUpdater; } - IgniteBinding IgniteEnvironment::GetBinding() const - { - return IgniteBinding(binding); - } - - IgniteBindingContext IgniteEnvironment::GetBindingContext() const + SharedPointer<IgniteBindingImpl> IgniteEnvironment::GetBinding() const { - return IgniteBindingContext(*cfg, GetBinding()); + return binding; } void IgniteEnvironment::ProcessorReleaseStart() @@ -321,6 +345,62 @@ namespace ignite } } + int64_t IgniteEnvironment::OnContinuousQueryFilterCreate(SharedPointer<InteropMemory>& mem) + { + if (!binding.Get()) + throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized."); + + InteropInputStream inStream(mem.Get()); + BinaryReaderImpl reader(&inStream); + + InteropOutputStream outStream(mem.Get()); + BinaryWriterImpl writer(&outStream, GetTypeManager()); + + BinaryObjectImpl binFilter = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position()); + + int32_t filterId = binFilter.GetTypeId(); + + bool invoked = false; + + int64_t res = binding.Get()->InvokeCallback(invoked, + IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE, filterId, reader, writer); + + if (!invoked) + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "C++ remote filter is not registered on the node (did you compile your program without -rdynamic?).", + "filterId", filterId); + } + + outStream.Synchronize(); + + return res; + } + + int64_t IgniteEnvironment::OnContinuousQueryFilterApply(SharedPointer<InteropMemory>& mem) + { + InteropInputStream inStream(mem.Get()); + BinaryReaderImpl reader(&inStream); + BinaryRawReader rawReader(&reader); + + int64_t handle = rawReader.ReadInt64(); + + SharedPointer<ContinuousQueryImplBase> qry = + StaticPointerCast<ContinuousQueryImplBase>(registry.Get(handle)); + + if (!qry.Get()) + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null query for handle.", "handle", handle); + + cache::event::CacheEntryEventFilterBase* filter = qry.Get()->GetFilterHolder().GetFilter(); + + if (!filter) + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_GENERIC, "Null filter for handle.", "handle", handle); + + bool res = filter->ReadAndProcessEvent(rawReader); + + return res ? 1 : 0; + } + void IgniteEnvironment::CacheInvokeCallback(SharedPointer<InteropMemory>& mem) { if (!binding.Get()) @@ -340,9 +420,11 @@ namespace ignite BinaryObjectImpl binProcHolder = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0); BinaryObjectImpl binProc = binProcHolder.GetField(0); - int64_t procId = binProc.GetTypeId(); + int32_t procId = binProc.GetTypeId(); + + bool invoked = false; - bool invoked = binding.Get()->InvokeCallbackById(procId, reader, writer); + binding.Get()->InvokeCallback(invoked, IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY, procId, reader, writer); if (!invoked) { http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/src/impl/ignite_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp index fd9bf45..546cd01 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp @@ -59,7 +59,7 @@ namespace ignite return env.Get()->Context(); } - IgniteBinding IgniteImpl::GetBinding() + SharedPointer<IgniteBindingImpl> IgniteImpl::GetBinding() { return env.Get()->GetBinding(); }
