http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/cache.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h index a975be3..54c0f96 100644 --- a/modules/platforms/cpp/core/include/ignite/cache/cache.h +++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h @@ -37,6 +37,8 @@ #include "ignite/cache/query/query_sql.h" #include "ignite/cache/query/query_text.h" #include "ignite/cache/query/query_sql_fields.h" +#include "ignite/cache/query/continuous/continuous_query_handle.h" +#include "ignite/cache/query/continuous/continuous_query.h" #include "ignite/impl/cache/cache_impl.h" #include "ignite/impl/operations.h" @@ -1339,6 +1341,106 @@ namespace ignite } /** + * Start continuous query execution. + * + * @param qry Continuous query. + * @return Continuous query handle. + */ + query::continuous::ContinuousQueryHandle<K, V> QueryContinuous( + const query::continuous::ContinuousQuery<K, V>& qry) + { + IgniteError err; + + query::continuous::ContinuousQueryHandle<K, V> res = QueryContinuous(qry, err); + + IgniteError::ThrowIfNeeded(err); + + return res; + } + + /** + * Start continuous query execution. + * + * @param qry Continuous query. + * @param err Error. + * @return Continuous query handle. + */ + query::continuous::ContinuousQueryHandle<K, V> QueryContinuous( + const query::continuous::ContinuousQuery<K, V>& qry, IgniteError& err) + { + using namespace impl::cache::query::continuous; + + if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener()) + { + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Event listener is not set for ContinuousQuery instance"); + + return query::continuous::ContinuousQueryHandle<K, V>(); + } + + ContinuousQueryHandleImpl* cqImpl; + cqImpl = impl.Get()->QueryContinuous(qry.impl, err); + + if (cqImpl) + cqImpl->SetQuery(qry.impl); + + return query::continuous::ContinuousQueryHandle<K, V>(cqImpl); + } + + /** + * Start continuous query execution with the initial query. + * + * @param qry Continuous query. + * @param initialQry Initial query to be executed. + * @return Continuous query handle. + */ + template<typename Q> + query::continuous::ContinuousQueryHandle<K, V> QueryContinuous( + const query::continuous::ContinuousQuery<K, V>& qry, + const Q& initialQry) + { + IgniteError err; + + query::continuous::ContinuousQueryHandle<K, V> res = QueryContinuous(qry, initialQry, err); + + IgniteError::ThrowIfNeeded(err); + + return res; + } + + /** + * Start continuous query execution with the initial query. + * + * @param qry Continuous query. + * @param initialQry Initial query to be executed. + * @param err Error. + * @return Continuous query handle. + */ + template<typename Q> + query::continuous::ContinuousQueryHandle<K, V> QueryContinuous( + const query::continuous::ContinuousQuery<K, V>& qry, + const Q& initialQry, IgniteError& err) + { + using namespace impl::cache::query::continuous; + + if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener()) + { + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Event listener is not set for ContinuousQuery instance"); + + return query::continuous::ContinuousQueryHandle<K, V>(); + } + + ContinuousQueryHandleImpl* cqImpl; + cqImpl = impl.Get()->QueryContinuous(qry.impl, initialQry, err); + + if (cqImpl) + cqImpl->SetQuery(qry.impl); + + return query::continuous::ContinuousQueryHandle<K, V>(cqImpl); + } + + /** * Check if the instance is valid. * * Invalid instance can be returned if some of the previous @@ -1356,7 +1458,7 @@ namespace ignite private: /** Implementation delegate. */ - ignite::common::concurrent::SharedPointer<impl::cache::CacheImpl> impl; + common::concurrent::SharedPointer<impl::cache::CacheImpl> impl; }; } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h b/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h index c737940..aea5182 100644 --- a/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h +++ b/modules/platforms/cpp/core/include/ignite/cache/cache_entry.h @@ -45,7 +45,9 @@ namespace ignite * Creates instance with both key and value default-constructed. */ CacheEntry() : - key(), val() + key(), + val(), + hasValue(false) { // No-op. } @@ -57,7 +59,9 @@ namespace ignite * @param val Value. */ CacheEntry(const K& key, const V& val) : - key(key), val(val) + key(key), + val(val), + hasValue(true) { // No-op. } @@ -68,7 +72,17 @@ namespace ignite * @param other Other instance. */ CacheEntry(const CacheEntry& other) : - key(other.key), val(other.val) + key(other.key), + val(other.val), + hasValue(other.hasValue) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~CacheEntry() { // No-op. } @@ -84,6 +98,7 @@ namespace ignite { key = other.key; val = other.val; + hasValue = other.hasValue; } return *this; @@ -94,7 +109,7 @@ namespace ignite * * @return Key. */ - K GetKey() const + const K& GetKey() const { return key; } @@ -104,17 +119,30 @@ namespace ignite * * @return Value. */ - V GetValue() const + const V& GetValue() const { return val; } - private: + /** + * Check if the value exists. + * + * @return True, if the value exists. + */ + bool HasValue() const + { + return hasValue; + } + + protected: /** Key. */ K key; /** Value. */ V val; + + /** Indicates whether value exists */ + bool hasValue; }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h new file mode 100644 index 0000000..14fa185 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event.h @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::cache::event::CacheEntryEvent class. + */ + +#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT +#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT + +#include <ignite/binary/binary_raw_reader.h> +#include <ignite/cache/cache_entry.h> + +namespace ignite +{ + namespace cache + { + /** + * Cache entry event class template. + * + * Both key and value types should be default-constructable, + * copy-constructable and assignable. + */ + template<typename K, typename V> + class CacheEntryEvent : public CacheEntry<K, V> + { + public: + /** + * Default constructor. + * + * Creates instance with all fields default-constructed. + */ + CacheEntryEvent() : + CacheEntry<K, V>(), + oldVal(), + hasOldValue(false) + { + // No-op. + } + + /** + * Copy constructor. + * + * @param other Other instance. + */ + CacheEntryEvent(const CacheEntryEvent<K, V>& other) : + CacheEntry<K, V>(other), + oldVal(other.oldVal), + hasOldValue(other.hasOldValue) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~CacheEntryEvent() + { + // No-op. + } + + /** + * Assignment operator. + * + * @param other Other instance. + * @return *this. + */ + CacheEntryEvent& operator=(const CacheEntryEvent<K, V>& other) + { + if (this != &other) + { + CacheEntry<K, V>::operator=(other); + + oldVal = other.oldVal; + hasOldValue = other.hasOldValue; + } + + return *this; + } + + /** + * Get old value. + * + * @return Old value. + */ + const V& GetOldValue() const + { + return oldVal; + } + + /** + * Check if the old value exists. + * + * @return True, if the old value exists. + */ + bool HasOldValue() const + { + return hasOldValue; + } + + /** + * Reads cache event using provided raw reader. + * + * @param reader Reader to use. + */ + void Read(binary::BinaryRawReader& reader) + { + this->key = reader.ReadObject<K>(); + + this->hasOldValue = reader.TryReadObject(this->oldVal); + this->hasValue = reader.TryReadObject(this->val); + } + + private: + /** Old value. */ + V oldVal; + + /** Indicates whether old value exists */ + bool hasOldValue; + }; + } +} + +#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h new file mode 100644 index 0000000..dd8f4a2 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_listener.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::cache::event::CacheEntryEventListener class. + */ + +#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER +#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER + +#include <stdint.h> + +#include <ignite/cache/event/cache_entry_event.h> + +namespace ignite +{ + namespace cache + { + namespace event + { + /** + * Cache entry event listener. + */ + template<typename K, typename V> + class CacheEntryEventListener + { + public: + /** + * Default constructor. + */ + CacheEntryEventListener() + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~CacheEntryEventListener() + { + // No-op. + } + + /** + * Event callback. + * + * @param evts Events. + * @param num Events number. + */ + virtual void OnEvent(const CacheEntryEvent<K, V>* evts, uint32_t num) = 0; + }; + } + } +} + +#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_LISTENER \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h new file mode 100644 index 0000000..563b11a --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::cache::query::continuous::ContinuousQuery class. + */ + +#ifndef _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY +#define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY + +#include <ignite/impl/cache/query/continuous/continuous_query_impl.h> +#include <ignite/cache/event/cache_entry_event_listener.h> + +namespace ignite +{ + namespace cache + { + // Forward-declaration. + template<typename K, typename V> + class IGNITE_IMPORT_EXPORT Cache; + + namespace query + { + namespace continuous + { + /** + * Continuous query. + * + * Continuous queries allow to register a remote and a listener + * for cache update events. On any update to the related cache + * an event is sent to the node that has executed the query and + * listener is notified on that node. + * + * Continuous query can either be executed on the whole topology + * or only on local node. + * + * To execute the query over the cache use method + * ignite::cache::Cache::QueryContinuous(). + */ + template<typename K, typename V> + class ContinuousQuery + { + friend class Cache<K, V>; + public: + + /** + * Default value for the buffer size. + */ + enum { DEFAULT_BUFFER_SIZE = 1 }; + + /** + * Default value for the time interval. + */ + enum { DEFAULT_TIME_INTERVAL = 0 }; + + /** + * Destructor. + */ + ~ContinuousQuery() + { + // No-op. + } + + /** + * Constructor. + * + * @param lsnr Event listener. Invoked on the node where + * continuous query execution has been started. + */ + ContinuousQuery(Reference<event::CacheEntryEventListener<K, V>> lsnr) : + impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr)) + { + // No-op. + } + + /** + * Constructor. + * + * @param lsnr Event listener Invoked on the node where + * continuous query execution has been started. + * @param loc Whether query should be executed locally. + */ + ContinuousQuery(Reference<event::CacheEntryEventListener<K, V>> lsnr, bool loc) : + impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, loc)) + { + // No-op. + } + + /** + * Set local flag. + * + * @param val Value of the flag. If true, query will be + * executed only on local node, so only local entries + * will be returned as query result. + */ + void SetLocal(bool val) + { + impl.Get()->SetLocal(val); + } + + /** + * Get local flag. + * + * @return Value of the flag. If true, query will be + * executed only on local node, so only local entries + * will be returned as query result. + */ + bool GetLocal() const + { + return impl.Get()->GetLocal(); + } + + /** + * Set buffer size. + * + * When a cache update happens, entry is first + * put into a buffer. Entries from buffer will be sent to + * the master node only if the buffer is full or time + * provided via timeInterval is exceeded. + * + * @param val Buffer size. + */ + void SetBufferSize(int32_t val) + { + impl.Get()->SetBufferSize(val); + } + + /** + * Get buffer size. + * + * When a cache update happens, entry is first + * put into a buffer. Entries from buffer will be sent to + * the master node only if the buffer is full or time + * provided via timeInterval is exceeded. + * + * @return Buffer size. + */ + int32_t GetBufferSize() const + { + return impl.Get()->GetBufferSize(); + } + + /** + * Set time interval. + * + * When a cache update happens, entry is first put into + * a buffer. Entries from buffer are sent to the master node + * only if the buffer is full (its size can be changed via + * SetBufferSize) or time provided via this method is + * exceeded. + * + * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which + * means that time check is disabled and entries will be + * sent only when buffer is full. + * + * @param val Time interval in miliseconds. + */ + void SetTimeInterval(int64_t val) + { + impl.Get()->SetTimeInterval(val); + } + + /** + * Get time interval. + * + * When a cache update happens, entry is first put into + * a buffer. Entries from buffer are sent to the master node + * only if the buffer is full (its size can be changed via + * SetBufferSize) or time provided via SetTimeInterval + * method is exceeded. + * + * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which + * means that time check is disabled and entries will be + * sent only when buffer is full. + * + * @return Time interval. + */ + int64_t GetTimeInterval() const + { + return impl.Get()->GetTimeInterval(); + } + + /** + * Set cache entry event listener. + * + * @param val Cache entry event listener. Invoked on the + * node where continuous query execution has been + * started. + */ + void SetListener(Reference<event::CacheEntryEventListener<K, V>> lsnr) + { + impl.Get()->SetListener(val); + } + + /** + * Get cache entry event listener. + * + * @return Cache entry event listener. + */ + const event::CacheEntryEventListener<K, V>& GetListener() const + { + return impl.Get()->GetListener(); + } + + /** + * Get cache entry event listener. + * + * @return Cache entry event listener. + */ + event::CacheEntryEventListener<K, V>& GetListener() + { + return impl.Get()->GetListener(); + } + + private: + /** Implementation. */ + common::concurrent::SharedPointer<impl::cache::query::continuous::ContinuousQueryImpl<K, V>> impl; + }; + } + } + } +} + +#endif //_IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h new file mode 100644 index 0000000..bbefbcc --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query_handle.h @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::cache::query::continuous::ContinuousQueryHandle class. + */ + +#ifndef _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE +#define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE + +#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h> + +namespace ignite +{ + namespace cache + { + namespace query + { + namespace continuous + { + /** + * Continuous query handle. + */ + template<typename K, typename V> + class ContinuousQueryHandle + { + public: + /** + * Default constructor. + */ + ContinuousQueryHandle() : + impl() + { + // No-op. + } + + /** + * Constructor. + * + * Internal method. Should not be used by user. + * + * @param impl Implementation. + */ + ContinuousQueryHandle(impl::cache::query::continuous::ContinuousQueryHandleImpl* impl) : + impl(impl) + { + // No-op. + } + + /** + * Gets the cursor for initial query. + * Can be called only once, throws IgniteError on consequent + * calls. + * + * @return Initial query cursor. + */ + QueryCursor<K, V> GetInitialQueryCursor() + { + IgniteError err; + + QueryCursor<K, V> res = GetInitialQueryCursor(err); + + IgniteError::ThrowIfNeeded(err); + + return res; + } + + /** + * Gets the cursor for initial query. + * Can be called only once, results in error on consequent + * calls. + * + * @param err Error. + * @return Initial query cursor. + */ + QueryCursor<K, V> GetInitialQueryCursor(IgniteError& err) + { + impl::cache::query::continuous::ContinuousQueryHandleImpl* impl0 = impl.Get(); + + if (impl0) + return QueryCursor<K, V>(impl0->GetInitialQueryCursor(err)); + else + { + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Instance is not usable (did you check for error?)."); + + return QueryCursor<K, V>(); + } + } + + /** + * Check if the instance is valid. + * + * Invalid instance can be returned if some of the previous + * operations have resulted in a failure. For example invalid + * instance can be returned by not-throwing version of method + * in case of error. Invalid instances also often can be + * created using default constructor. + * + * @return True if the instance is valid and can be used. + */ + bool IsValid() const + { + return impl.IsValid(); + } + + private: + typedef impl::cache::query::continuous::ContinuousQueryHandleImpl ContinuousQueryHandleImpl; + + /** Implementation delegate. */ + common::concurrent::SharedPointer<ContinuousQueryHandleImpl> impl; + }; + } + } + } +} + +#endif //_IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h index 3e0f177..535e3ec 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h @@ -22,7 +22,10 @@ #include <ignite/cache/query/query_sql.h> #include <ignite/cache/query/query_text.h> #include <ignite/cache/query/query_sql_fields.h> +#include <ignite/cache/query/continuous/continuous_query_handle.h> #include <ignite/impl/cache/query/query_impl.h> +#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h> +#include <ignite/impl/cache/query/continuous/continuous_query_impl.h> #include <ignite/impl/interop/interop_target.h> @@ -309,12 +312,59 @@ namespace ignite * @return Query cursor. */ query::QueryCursorImpl* QuerySqlFields(const ignite::cache::query::SqlFieldsQuery& qry, IgniteError* err); - + + /** + * Start continuous query execution. + * + * @param qry Continuous query. + * @param err Error. + * @return Continuous query handle. + */ + query::continuous::ContinuousQueryHandleImpl* QueryContinuous( + const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry, + IgniteError& err); + + /** + * Start continuous query execution with initial query. + * + * @param qry Continuous query. + * @param initialQry Initial query. + * @param err Error. + * @return Continuous query handle. + */ + query::continuous::ContinuousQueryHandleImpl* QueryContinuous( + const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry, + const ignite::cache::query::SqlQuery& initialQry, IgniteError& err); + + /** + * Start continuous query execution with initial query. + * + * @param qry Continuous query. + * @param initialQry Initial query. + * @param err Error. + * @return Continuous query handle. + */ + query::continuous::ContinuousQueryHandleImpl* QueryContinuous( + const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry, + const ignite::cache::query::TextQuery& initialQry, IgniteError& err); + + /** + * Start continuous query execution with initial query. + * + * @param qry Continuous query. + * @param initialQry Initial query. + * @param err Error. + * @return Continuous query handle. + */ + query::continuous::ContinuousQueryHandleImpl* QueryContinuous( + const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry, + const ignite::cache::query::ScanQuery& initialQry, IgniteError& err); + private: + IGNITE_NO_COPY_ASSIGNMENT(CacheImpl) + /** Name. */ char* name; - - IGNITE_NO_COPY_ASSIGNMENT(CacheImpl) /** * Internal query execution routine. @@ -346,11 +396,67 @@ namespace ignite if (jniErr.code == ignite::java::IGNITE_JNI_ERR_SUCCESS) return new query::QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef); else - return NULL; + return 0; + } + + /** + * Start continuous query execution with the initial query. + * + * @param qry Continuous query. + * @param initialQry Initial query to be executed. + * @param err Error. + * @return Continuous query handle. + */ + template<typename T> + query::continuous::ContinuousQueryHandleImpl* QueryContinuous( + const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry, + const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err) + { + jni::java::JniErrorInfo jniErr; + + common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory(); + interop::InteropMemory* mem0 = mem.Get(); + interop::InteropOutputStream out(mem0); + binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + ignite::binary::BinaryRawWriter rawWriter(&writer); + + const query::continuous::ContinuousQueryImplBase& qry0 = *qry.Get(); + + int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry); + + rawWriter.WriteInt64(handle); + rawWriter.WriteBool(qry0.GetLocal()); + + // Filters are not supported for now. + rawWriter.WriteBool(false); + rawWriter.WriteNull(); + + rawWriter.WriteInt32(qry0.GetBufferSize()); + rawWriter.WriteInt64(qry0.GetTimeInterval()); + + // Autounsubscribe is a filter feature. + rawWriter.WriteBool(false); + + // Writing initial query. When there is not initial query writing -1. + rawWriter.WriteInt32(typ); + if (typ != -1) + initialQry.Write(rawWriter); + + out.Synchronize(); + + jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(), + cmd, mem.Get()->PointerLong(), &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err); + + if (jniErr.code == java::IGNITE_JNI_ERR_SUCCESS) + return new query::continuous::ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef); + + return 0; } }; } } } -#endif \ No newline at end of file +#endif http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h new file mode 100644 index 0000000..75504b1 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::cache::query::continuous::ContinuousQueryHandleImpl class. + */ + +#ifndef _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL +#define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL + +#include "ignite/cache/query/query_cursor.h" +#include "ignite/impl/cache/query/continuous/continuous_query_impl.h" + +namespace ignite +{ + namespace impl + { + namespace cache + { + namespace query + { + namespace continuous + { + /** + * Continuous query handle implementation. + */ + class IGNITE_IMPORT_EXPORT ContinuousQueryHandleImpl + { + typedef common::concurrent::SharedPointer<IgniteEnvironment> SP_IgniteEnvironment; + typedef common::concurrent::SharedPointer<ContinuousQueryImplBase> SP_ContinuousQueryImplBase; + public: + /** + * Default constructor. + * + * @param env Environment. + * @param javaRef Java reference. + */ + ContinuousQueryHandleImpl(SP_IgniteEnvironment env, int64_t handle, jobject javaRef); + + /** + * Destructor. + */ + ~ContinuousQueryHandleImpl(); + + /** + * Gets the cursor for initial query. + * Can be called only once, throws exception on consequent calls. + * + * @param err Error. + * @return Initial query cursor. + */ + QueryCursorImpl* GetInitialQueryCursor(IgniteError& err); + + /** + * Set query to keep pointer to. + * + * @param query Query. + */ + void SetQuery(SP_ContinuousQueryImplBase query); + + private: + /** Environment. */ + SP_IgniteEnvironment env; + + /** Local handle for handle registry. */ + int64_t handle; + + /** Handle to Java object. */ + jobject javaRef; + + /** Shared pointer to query. Kept for query to live long enough. */ + SP_ContinuousQueryImplBase qry; + + /** Mutex. */ + common::concurrent::CriticalSection mutex; + + /** Cursor extracted. */ + bool extracted; + }; + } + } + } + } +} + +#endif //_IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_HANDLE_IMPL \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h new file mode 100644 index 0000000..50ced12 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::cache::query::continuous::ContinuousQueryImpl class. + */ + +#ifndef _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL +#define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL + +#include <stdint.h> + +#include <ignite/reference.h> + +#include <ignite/cache/event/cache_entry_event_listener.h> +#include <ignite/binary/binary_raw_reader.h> + +namespace ignite +{ + namespace impl + { + namespace cache + { + namespace query + { + namespace continuous + { + /** + * Continuous query base implementation class. + * + * Continuous queries allow to register a remote and a listener + * for cache update events. On any update to the related cache + * an event is sent to the node that has executed the query and + * listener is notified on that node. + * + * Continuous query can either be executed on the whole topology + * or only on local node. + * + * To execute the query over the cache use method + * ignite::cache::Cache::QueryContinuous(). + */ + class ContinuousQueryImplBase + { + public: + /** + * Destructor. + */ + virtual ~ContinuousQueryImplBase() + { + // No-op. + } + + /** + * Default value for the buffer size. + */ + enum { DEFAULT_BUFFER_SIZE = 1 }; + + /** + * Default value for the time interval. + */ + enum { DEFAULT_TIME_INTERVAL = 0 }; + + /** + * Constructor. + * + * @param loc Whether query should be executed locally. + */ + explicit ContinuousQueryImplBase(bool loc) : + local(loc), + bufferSize(DEFAULT_BUFFER_SIZE), + timeInterval(DEFAULT_TIME_INTERVAL) + { + // No-op. + } + + /** + * Set local flag. + * + * @param val Value of the flag. If true, query will be + * executed only on local node, so only local entries + * will be returned as query result. + */ + void SetLocal(bool val) + { + local = val; + } + + /** + * Get local flag. + * + * @return Value of the flag. If true, query will be + * executed only on local node, so only local entries + * will be returned as query result. + */ + bool GetLocal() const + { + return local; + } + + /** + * Set buffer size. + * + * When a cache update happens, entry is first + * put into a buffer. Entries from buffer will be sent to + * the master node only if the buffer is full or time + * provided via timeInterval is exceeded. + * + * @param val Buffer size. + */ + void SetBufferSize(int32_t val) + { + bufferSize = val; + } + + /** + * Get buffer size. + * + * When a cache update happens, entry is first + * put into a buffer. Entries from buffer will be sent to + * the master node only if the buffer is full or time + * provided via timeInterval is exceeded. + * + * @return Buffer size. + */ + int32_t GetBufferSize() const + { + return bufferSize; + } + + /** + * Set time interval. + * + * When a cache update happens, entry is first put into + * a buffer. Entries from buffer are sent to the master node + * only if the buffer is full (its size can be changed via + * SetBufferSize) or time provided via this method is + * exceeded. + * + * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which + * means that time check is disabled and entries will be + * sent only when buffer is full. + * + * @param val Time interval in miliseconds. + */ + void SetTimeInterval(int64_t val) + { + timeInterval = val; + } + + /** + * Get time interval. + * + * When a cache update happens, entry is first put into + * a buffer. Entries from buffer are sent to the master node + * only if the buffer is full (its size can be changed via + * SetBufferSize) or time provided via SetTimeInterval + * method is exceeded. + * + * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which + * means that time check is disabled and entries will be + * sent only when buffer is full. + * + * @return Time interval. + */ + int64_t GetTimeInterval() const + { + return timeInterval; + } + + /** + * Callback that reads and processes cache events. + * + * @param reader Reader to use. + */ + virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader) = 0; + + private: + /** + * Local flag. When set query will be executed only on local + * node, so only local entries will be returned as query + * result. + * + * Default value is false. + */ + bool local; + + /** + * Buffer size. When a cache update happens, entry is first + * put into a buffer. Entries from buffer will be sent to + * the master node only if the buffer is full or time + * provided via timeInterval is exceeded. + * + * Default value is DEFAULT_BUFFER_SIZE. + */ + int32_t bufferSize; + + /** + * Time interval in miliseconds. When a cache update + * happens, entry is first put into a buffer. Entries from + * buffer will be sent to the master node only if the buffer + * is full (its size can be changed via SetBufferSize) or + * time provided via SetTimeInterval method is exceeded. + * + * Default value is DEFAULT_TIME_INTERVAL, i.e. 0, which + * means that time check is disabled and entries will be + * sent only when buffer is full. + */ + int64_t timeInterval; + }; + + /** + * Continuous query implementation. + * + * Continuous queries allow to register a remote and a listener + * for cache update events. On any update to the related cache + * an event is sent to the node that has executed the query and + * listener is notified on that node. + * + * Continuous query can either be executed on the whole topology + * or only on local node. + * + * To execute the query over the cache use method + * ignite::cache::Cache::QueryContinuous(). + */ + template<typename K, typename V> + class ContinuousQueryImpl : public ContinuousQueryImplBase + { + public: + /** + * Destructor. + */ + virtual ~ContinuousQueryImpl() + { + // No-op. + } + + /** + * Constructor. + * + * @param lsnr Event listener. Invoked on the node where + * continuous query execution has been started. + */ + ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& lsnr) : + ContinuousQueryImplBase(false), + lsnr(lsnr) + { + // No-op. + } + + /** + * Constructor. + * + * @param lsnr Event listener Invoked on the node where + * continuous query execution has been started. + * @param loc Whether query should be executed locally. + */ + ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& lsnr, bool loc) : + ContinuousQueryImplBase(loc), + lsnr(lsnr) + { + // No-op. + } + + /** + * Set cache entry event listener. + * + * @param val Cache entry event listener. Invoked on the + * node where continuous query execution has been + * started. + */ + void SetListener(Reference<ignite::cache::event::CacheEntryEventListener<K, V>>& val) + { + lsnr = val; + } + + /** + * Check if the query has listener. + * + * @return True if the query has listener. + */ + bool HasListener() const + { + return !lsnr.IsNull(); + } + + /** + * Get cache entry event listener. + * + * @return Cache entry event listener. + */ + const ignite::cache::event::CacheEntryEventListener<K, V>& GetListener() const + { + return lsnr.Get(); + } + + /** + * Get cache entry event listener. + * + * @return Cache entry event listener. + */ + ignite::cache::event::CacheEntryEventListener<K, V>& GetListener() + { + return lsnr.Get(); + } + + /** + * Callback that reads and processes cache events. + * + * @param reader Reader to use. + */ + virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader& reader) + { + // Number of events. + int32_t cnt = reader.ReadInt32(); + + // Storing events here. + std::vector< ignite::cache::CacheEntryEvent<K, V> > events; + events.resize(cnt); + + for (int32_t i = 0; i < cnt; ++i) + events[i].Read(reader); + + lsnr.Get().OnEvent(events.data(), cnt); + } + + private: + /** Cache entry event listener. */ + Reference<ignite::cache::event::CacheEntryEventListener<K, V>> lsnr; + }; + } + } + } + } +} + +#endif //_IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h b/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h index 107042a..3c4d123 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h +++ b/modules/platforms/cpp/core/include/ignite/impl/handle_registry.h @@ -28,21 +28,9 @@ namespace ignite namespace impl { /** - * Something what can be registered inside handle registry. - */ - class IGNITE_IMPORT_EXPORT HandleRegistryEntry - { - public: - /** - * Destructor. - */ - virtual ~HandleRegistryEntry(); - }; - - /** * Handle registry segment containing thread-specific data for slow-path access. */ - class IGNITE_IMPORT_EXPORT HandleRegistrySegment + class HandleRegistrySegment { public: /** @@ -61,7 +49,7 @@ namespace ignite * @param hnd Handle. * @return Associated entry or NULL if it doesn't exists. */ - ignite::common::concurrent::SharedPointer<HandleRegistryEntry> Get(int64_t hnd); + common::concurrent::SharedPointer<void> Get(int64_t hnd); /** * Put entry into segment. @@ -69,14 +57,14 @@ namespace ignite * @param hnd Handle. * @param entry Associated entry (cannot be NULL). */ - void Put(int64_t hnd, const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& entry); + void Put(int64_t hnd, const common::concurrent::SharedPointer<void>& entry); /** * Remove entry from the segment. * * @param hnd Handle. */ - void Remove(int64_t hnd); + void Remove(int64_t hnd); /** * Clear all entries from the segment. @@ -84,10 +72,10 @@ namespace ignite void Clear(); private: /** Map with data. */ - std::map<int64_t, ignite::common::concurrent::SharedPointer<HandleRegistryEntry>>* map; + std::map<int64_t, common::concurrent::SharedPointer<void>> map; /** Mutex. */ - ignite::common::concurrent::CriticalSection* mux; + common::concurrent::CriticalSection mux; IGNITE_NO_COPY_ASSIGNMENT(HandleRegistrySegment); }; @@ -102,7 +90,7 @@ namespace ignite * Constructor. * * @param fastCap Fast-path capacity. - * @param segmentCnt Slow-path segments count. + * @param slowSegmentCnt Slow-path segments count. */ HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt); @@ -117,7 +105,7 @@ namespace ignite * @param target Target. * @return Handle. */ - int64_t Allocate(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target); + int64_t Allocate(const common::concurrent::SharedPointer<void>& target); /** * Allocate handle in critical mode. @@ -125,7 +113,7 @@ namespace ignite * @param target Target. * @return Handle. */ - int64_t AllocateCritical(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target); + int64_t AllocateCritical(const common::concurrent::SharedPointer<void>& target); /** * Allocate handle in safe mode. @@ -133,7 +121,7 @@ namespace ignite * @param target Target. * @return Handle. */ - int64_t AllocateSafe(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target); + int64_t AllocateSafe(const common::concurrent::SharedPointer<void>& target); /** * Allocate handle in critical and safe modes. @@ -141,7 +129,7 @@ namespace ignite * @param target Target. * @return Handle. */ - int64_t AllocateCriticalSafe(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target); + int64_t AllocateCriticalSafe(const common::concurrent::SharedPointer<void>& target); /** * Release handle. @@ -154,35 +142,36 @@ namespace ignite * Get target. * * @param hnd Handle. - * @param Target. + * @return Target. */ - ignite::common::concurrent::SharedPointer<HandleRegistryEntry> Get(int64_t hnd); + common::concurrent::SharedPointer<void> Get(int64_t hnd); /** * Close the registry. */ void Close(); + private: /** Fast-path container capacity. */ - int32_t fastCap; + int32_t fastCap; /** Fast-path counter. */ - int32_t fastCtr; + int32_t fastCtr; /** Fast-path container. */ - ignite::common::concurrent::SharedPointer<HandleRegistryEntry>* fast; + common::concurrent::SharedPointer<void>* fast; /** Amount of slow-path segments. */ - int32_t slowSegmentCnt; + int32_t slowSegmentCnt; /** Slow-path counter. */ - int64_t slowCtr; - + int64_t slowCtr; + /** Slow-path segments. */ - HandleRegistrySegment** slow; + HandleRegistrySegment** slow; /** Close flag. */ - int32_t closed; + int32_t closed; IGNITE_NO_COPY_ASSIGNMENT(HandleRegistry); @@ -190,11 +179,10 @@ namespace ignite * Internal allocation routine. * * @param target Target. - * @param Critical mode flag. - * @param Safe mode flag. + * @param critical mode flag. + * @param safe mode flag. */ - int64_t Allocate0(const ignite::common::concurrent::SharedPointer<HandleRegistryEntry>& target, - bool critical, bool safe); + int64_t Allocate0(const common::concurrent::SharedPointer<void>& target, bool critical, bool safe); }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h index fb6f657..2b2a117 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h @@ -20,14 +20,15 @@ #include <ignite/common/concurrent.h> #include <ignite/jni/java.h> +#include <ignite/jni/utils.h> #include "ignite/impl/interop/interop_memory.h" #include "ignite/impl/binary/binary_type_manager.h" -#include "ignite/jni/utils.h" +#include "ignite/impl/handle_registry.h" -namespace ignite +namespace ignite { - namespace impl + namespace impl { /** * Defines environment in which Ignite operates. @@ -41,6 +42,16 @@ namespace ignite enum { DEFAULT_ALLOCATION_SIZE = 1024 }; /** + * Default fast path handle registry containers capasity. + */ + enum { DEFAULT_FAST_PATH_CONTAINERS_CAP = 1024 }; + + /** + * Default slow path handle registry containers capasity. + */ + enum { DEFAULT_SLOW_PATH_CONTAINERS_CAP = 16 }; + + /** * Default constructor. */ IgniteEnvironment(); @@ -78,6 +89,13 @@ namespace ignite void OnStartCallback(long long memPtr, jobject proc); /** + * Continuous query listener apply callback. + * + * @param mem Memory with data. + */ + void OnContinuousQueryListenerApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem); + + /** * Get name of Ignite instance. * * @return Name. @@ -133,6 +151,13 @@ namespace ignite */ void ProcessorReleaseStart(); + /** + * Get handle registry. + * + * @return Handle registry. + */ + HandleRegistry& GetHandleRegistry(); + private: /** Context to access Java. */ common::concurrent::SharedPointer<jni::java::JniContext> ctx; @@ -152,6 +177,9 @@ namespace ignite /** Type updater. */ binary::BinaryTypeUpdater* metaUpdater; + /** Handle registry. */ + HandleRegistry registry; + IGNITE_NO_COPY_ASSIGNMENT(IgniteEnvironment); }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/namespaces.dox ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/namespaces.dox b/modules/platforms/cpp/core/namespaces.dox index 0f5f11f..49379e6 100644 --- a/modules/platforms/cpp/core/namespaces.dox +++ b/modules/platforms/cpp/core/namespaces.dox @@ -22,40 +22,54 @@ * computing and transacting on large-scale data sets in real-time, orders of magnitude faster than possible with * traditional disk-based or flash-based technologies. */ - + /** * Apache %Ignite API. */ namespace ignite { - /** - * %Ignite Binary Objects API. - */ - namespace binary - { - // Empty. - } + /** + * %Ignite Binary Objects API. + */ + namespace binary + { + // Empty. + } + + /** + * %Ignite %Transaction API. + */ + namespace transactions + { + // Empty. + } - /** - * %Ignite %Transaction API. - */ - namespace transactions - { - // Empty. - } - - /** - * %Ignite %Cache API. - */ - namespace cache - { - /** - * Contains APIs for creating and executing cache queries. - */ - namespace query - { - // Empty. - } - } + /** + * %Ignite %Cache API. + */ + namespace cache + { + /** + * Contains APIs for cache events. + */ + namespace event + { + // Empty. + } + + /** + * Contains APIs for creating and executing cache queries. + */ + namespace query + { + /** + * Contains APIs for continuous queries. + */ + namespace continuous + { + // Empty. + } + } + } } - + http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/project/vs/core.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj index 6320323..89a2dff 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj @@ -193,6 +193,10 @@ <ClInclude Include="..\..\include\ignite\cache\cache.h" /> <ClInclude Include="..\..\include\ignite\cache\cache_entry.h" /> <ClInclude Include="..\..\include\ignite\cache\cache_peek_mode.h" /> + <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h" /> + <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h" /> + <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h" /> + <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h" /> <ClInclude Include="..\..\include\ignite\cache\query\query.h" /> <ClInclude Include="..\..\include\ignite\cache\query\query_argument.h" /> <ClInclude Include="..\..\include\ignite\cache\query\query_cursor.h" /> @@ -208,6 +212,8 @@ <ClInclude Include="..\..\include\ignite\impl\binary\binary_type_updater_impl.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h" /> + <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h" /> + <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\query\query_fields_row_impl.h" /> <ClInclude Include="..\..\include\ignite\impl\cache\query\query_impl.h" /> <ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" /> @@ -229,6 +235,7 @@ <ClCompile Include="..\..\src\impl\binary\binary_type_updater_impl.cpp" /> <ClCompile Include="..\..\src\impl\cache\cache_impl.cpp" /> <ClCompile Include="..\..\src\impl\cache\query\query_batch.cpp" /> + <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" /> <ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" /> <ClCompile Include="..\..\src\impl\ignite_environment.cpp" /> <ClCompile Include="..\..\src\impl\ignite_impl.cpp" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/project/vs/core.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters index c5fb532..9cb5f78 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters @@ -46,6 +46,9 @@ <ClCompile Include="..\..\src\impl\cache\query\query_batch.cpp"> <Filter>Code\impl\cache\query</Filter> </ClCompile> + <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp"> + <Filter>Code\impl\cache\query\continuous</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h"> @@ -144,6 +147,24 @@ <ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h"> <Filter>Code\impl\cache\query</Filter> </ClInclude> + <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h"> + <Filter>Code\cache\query\continuous</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h"> + <Filter>Code\cache\query\continuous</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h"> + <Filter>Code\impl\cache\query\continuous</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h"> + <Filter>Code\cache\event</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h"> + <Filter>Code\cache\event</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h"> + <Filter>Code\impl\cache\query\continuous</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <Filter Include="Code"> @@ -176,5 +197,14 @@ <Filter Include="Code\transactions"> <UniqueIdentifier>{146fe661-0ad3-4d51-83a3-fce8a897e84d}</UniqueIdentifier> </Filter> + <Filter Include="Code\cache\query\continuous"> + <UniqueIdentifier>{2056dfc8-4ced-4658-b2b7-a8c81a7ef797}</UniqueIdentifier> + </Filter> + <Filter Include="Code\impl\cache\query\continuous"> + <UniqueIdentifier>{d633f819-7b30-4e26-81ec-f708d1c1ff8e}</UniqueIdentifier> + </Filter> + <Filter Include="Code\cache\event"> + <UniqueIdentifier>{e03c3690-ff22-4c78-83a0-b77cebb7f980}</UniqueIdentifier> + </Filter> </ItemGroup> </Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp index 0630921..ad69d45 100644 --- a/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cache/cache_impl.cpp @@ -26,9 +26,11 @@ using namespace ignite::java; using namespace ignite::common; using namespace ignite::cache; using namespace ignite::cache::query; +using namespace ignite::cache::query::continuous; using namespace ignite::impl; using namespace ignite::impl::binary; using namespace ignite::impl::cache::query; +using namespace ignite::impl::cache::query::continuous; using namespace ignite::impl::interop; using namespace ignite::binary; @@ -89,6 +91,9 @@ namespace ignite /** Operation: PutIfAbsent. */ const int32_t OP_PUT_IF_ABSENT = 28; + /** Operation: CONTINUOUS query. */ + const int32_t OP_QRY_CONTINUOUS = 29; + /** Operation: SCAN query. */ const int32_t OP_QRY_SCAN = 30; @@ -301,6 +306,32 @@ namespace ignite { return QueryInternal(qry, OP_QRY_SQL_FIELDS, err); } + + ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry, + const SqlQuery& initialQry, IgniteError& err) + { + return QueryContinuous(qry, initialQry, OP_QRY_SQL, OP_QRY_CONTINUOUS, err); + } + + ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry, + const TextQuery& initialQry, IgniteError& err) + { + return QueryContinuous(qry, initialQry, OP_QRY_TEXT, OP_QRY_CONTINUOUS, err); + } + + ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry, + const ScanQuery& initialQry, IgniteError& err) + { + return QueryContinuous(qry, initialQry, OP_QRY_SCAN, OP_QRY_CONTINUOUS, err); + } + + ContinuousQueryHandleImpl* CacheImpl::QueryContinuous(const SharedPointer<ContinuousQueryImplBase> qry, + IgniteError& err) + { + struct { void Write(BinaryRawWriter&) const { }} dummy; + + return QueryContinuous(qry, dummy, -1, OP_QRY_CONTINUOUS, err); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp new file mode 100644 index 0000000..04e64c9 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/cache/query/continuous/continuous_query_handle_impl.cpp @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ignite/impl/cache/query/continuous/continuous_query_handle_impl.h" + +using namespace ignite::common::concurrent; +using namespace ignite::jni::java; +using namespace ignite::java; +using namespace ignite::impl::interop; +using namespace ignite::impl::binary; + +namespace ignite +{ + namespace impl + { + namespace cache + { + namespace query + { + namespace continuous + { + enum Command + { + GET_INITIAL_QUERY = 0, + + CLOSE = 1 + }; + + ContinuousQueryHandleImpl::ContinuousQueryHandleImpl(SP_IgniteEnvironment env, int64_t handle, jobject javaRef) : + env(env), + handle(handle), + javaRef(javaRef), + mutex(), + extracted(false) + { + // No-op. + } + + ContinuousQueryHandleImpl::~ContinuousQueryHandleImpl() + { + env.Get()->Context()->TargetInLongOutLong(javaRef, CLOSE, 0); + + JniContext::Release(javaRef); + + env.Get()->GetHandleRegistry().Release(handle); + } + + QueryCursorImpl* ContinuousQueryHandleImpl::GetInitialQueryCursor(IgniteError& err) + { + CsLockGuard guard(mutex); + + if (extracted) + { + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "GetInitialQueryCursor() can be called only once."); + + return 0; + } + + JniErrorInfo jniErr; + + jobject res = env.Get()->Context()->TargetOutObject(javaRef, GET_INITIAL_QUERY, &jniErr); + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, &err); + + if (jniErr.code != IGNITE_JNI_ERR_SUCCESS) + return 0; + + extracted = true; + + return new QueryCursorImpl(env, res); + } + + void ContinuousQueryHandleImpl::SetQuery(SP_ContinuousQueryImplBase query) + { + qry = query; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/598b464f/modules/platforms/cpp/core/src/impl/handle_registry.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/handle_registry.cpp b/modules/platforms/cpp/core/src/impl/handle_registry.cpp index c447faa..069e996 100644 --- a/modules/platforms/cpp/core/src/impl/handle_registry.cpp +++ b/modules/platforms/cpp/core/src/impl/handle_registry.cpp @@ -23,83 +23,67 @@ namespace ignite { namespace impl { - HandleRegistryEntry::~HandleRegistryEntry() - { - // No-op. - } - HandleRegistrySegment::HandleRegistrySegment() : - map(new std::map<int64_t, SharedPointer<HandleRegistryEntry>>()), mux(new CriticalSection()) + map(), + mux() { // No-op. } HandleRegistrySegment::~HandleRegistrySegment() { - delete map; - delete mux; + // No-op. } - SharedPointer<HandleRegistryEntry> HandleRegistrySegment::Get(int64_t hnd) + SharedPointer<void> HandleRegistrySegment::Get(int64_t hnd) { - mux->Enter(); + typedef std::map<int64_t, SharedPointer<void>> Map; - SharedPointer<HandleRegistryEntry> res = (*map)[hnd]; + CsLockGuard guard(mux); - mux->Leave(); + Map::const_iterator it = map.find(hnd); + if (it == map.end()) + return SharedPointer<void>(); - return res; + return it->second; } - void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<HandleRegistryEntry>& entry) + void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<void>& entry) { - mux->Enter(); - - (*map)[hnd] = entry; + CsLockGuard guard(mux); - mux->Leave(); + map[hnd] = entry; } void HandleRegistrySegment::Remove(int64_t hnd) { - mux->Enter(); + CsLockGuard guard(mux); - map->erase(hnd); - - mux->Leave(); + map.erase(hnd); } void HandleRegistrySegment::Clear() { - mux->Enter(); - - map->erase(map->begin(), map->end()); + CsLockGuard guard(mux); - mux->Leave(); + map.clear(); } - HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt) + HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt) : + fastCap(fastCap), + fastCtr(0), + fast(new SharedPointer<void>[fastCap]), + slowSegmentCnt(slowSegmentCnt), + slowCtr(fastCap), + slow(new HandleRegistrySegment*[slowSegmentCnt]), + closed(0) { - this->fastCap = fastCap; + for (int32_t i = 0; i < fastCap; i++) + fast[i] = SharedPointer<void>(); - fastCtr = 0; - - fast = new SharedPointer<HandleRegistryEntry>[fastCap]; - - for (int i = 0; i < fastCap; i++) - fast[i] = SharedPointer<HandleRegistryEntry>(); - - this->slowSegmentCnt = slowSegmentCnt; - - slowCtr = fastCap; - - slow = new HandleRegistrySegment*[slowSegmentCnt]; - - for (int i = 0; i < slowSegmentCnt; i++) + for (int32_t i = 0; i < slowSegmentCnt; i++) slow[i] = new HandleRegistrySegment(); - closed = 0; - Memory::Fence(); } @@ -115,22 +99,22 @@ namespace ignite delete[] slow; } - int64_t HandleRegistry::Allocate(const SharedPointer<HandleRegistryEntry>& target) + int64_t HandleRegistry::Allocate(const SharedPointer<void>& target) { return Allocate0(target, false, false); } - int64_t HandleRegistry::AllocateCritical(const SharedPointer<HandleRegistryEntry>& target) + int64_t HandleRegistry::AllocateCritical(const SharedPointer<void>& target) { return Allocate0(target, true, false); } - int64_t HandleRegistry::AllocateSafe(const SharedPointer<HandleRegistryEntry>& target) + int64_t HandleRegistry::AllocateSafe(const SharedPointer<void>& target) { return Allocate0(target, false, true); } - int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<HandleRegistryEntry>& target) + int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<void>& target) { return Allocate0(target, true, true); } @@ -138,10 +122,10 @@ namespace ignite void HandleRegistry::Release(int64_t hnd) { if (hnd < fastCap) - fast[static_cast<int32_t>(hnd)] = SharedPointer<HandleRegistryEntry>(); + fast[static_cast<int32_t>(hnd)] = SharedPointer<void>(); else { - HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt); + HandleRegistrySegment* segment = slow[hnd % slowSegmentCnt]; segment->Remove(hnd); } @@ -149,7 +133,7 @@ namespace ignite Memory::Fence(); } - SharedPointer<HandleRegistryEntry> HandleRegistry::Get(int64_t hnd) + SharedPointer<void> HandleRegistry::Get(int64_t hnd) { Memory::Fence(); @@ -157,7 +141,7 @@ namespace ignite return fast[static_cast<int32_t>(hnd)]; else { - HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt); + HandleRegistrySegment* segment = slow[hnd % slowSegmentCnt]; return segment->Get(hnd); } @@ -168,16 +152,16 @@ namespace ignite if (Atomics::CompareAndSet32(&closed, 0, 1)) { // Cleanup fast-path handles. - for (int i = 0; i < fastCap; i++) - fast[i] = SharedPointer<HandleRegistryEntry>(); + for (int32_t i = 0; i < fastCap; i++) + fast[i] = SharedPointer<void>(); // Cleanup slow-path handles. - for (int i = 0; i < slowSegmentCnt; i++) - (*(slow + i))->Clear(); + for (int32_t i = 0; i < slowSegmentCnt; i++) + slow[i]->Clear(); } } - int64_t HandleRegistry::Allocate0(const SharedPointer<HandleRegistryEntry>& target, bool critical, bool safe) + int64_t HandleRegistry::Allocate0(const SharedPointer<void>& target, bool critical, bool safe) { // Check closed state. Memory::Fence(); @@ -201,7 +185,7 @@ namespace ignite if (safe && closed == 1) { - fast[fastIdx] = SharedPointer<HandleRegistryEntry>(); + fast[fastIdx] = SharedPointer<void>(); return -1; } @@ -214,7 +198,7 @@ namespace ignite // Either allocating on slow-path, or fast-path can no longer accomodate more entries. int64_t slowIdx = Atomics::IncrementAndGet64(&slowCtr) - 1; - HandleRegistrySegment* segment = *(slow + slowIdx % slowSegmentCnt); + HandleRegistrySegment* segment = slow[slowIdx % slowSegmentCnt]; segment->Put(slowIdx, target);
