http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp new file mode 100644 index 0000000..607d330 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.cpp @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/impl/thin/writable_key.h> + +#include "impl/response_status.h" +#include "impl/message.h" +#include "impl/cache/cache_client_impl.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + namespace cache + { + CacheClientImpl::CacheClientImpl( + const SP_DataRouter& router, + const std::string& name, + int32_t id) : + router(router), + name(name), + id(id), + binary(false) + { + // No-op. + } + + CacheClientImpl::~CacheClientImpl() + { + // No-op. + } + + template<typename ReqT, typename RspT> + void CacheClientImpl::SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp) + { + SP_CacheAffinityInfo affinityInfo = router.Get()->GetAffinityMapping(id); + + if (!affinityInfo.IsValid()) + { + router.Get()->SyncMessage(req, rsp); + } + else + { + const EndPoints& endPoints = affinityInfo.Get()->GetMapping(key); + + router.Get()->SyncMessage(req, rsp, endPoints); + } + } + + void CacheClientImpl::Put(const WritableKey& key, const Writable& value) + { + CachePutRequest req(id, binary, key, value); + Response rsp; + + SyncCacheKeyMessage(key, req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + } + + void CacheClientImpl::Get(const WritableKey& key, Readable& value) + { + CacheKeyRequest<RequestType::CACHE_GET> req(id, binary, key); + CacheGetResponse rsp(value); + + SyncCacheKeyMessage(key, req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + } + + bool CacheClientImpl::ContainsKey(const WritableKey& key) + { + CacheKeyRequest<RequestType::CACHE_CONTAINS_KEY> req(id, binary, key); + BoolResponse rsp; + + SyncCacheKeyMessage(key, req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + + return rsp.GetValue(); + } + + int64_t CacheClientImpl::GetSize(int32_t peekModes) + { + CacheGetSizeRequest req(id, binary, peekModes); + Int64Response rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + + return rsp.GetValue(); + } + + bool CacheClientImpl::Remove(const WritableKey& key) + { + CacheKeyRequest<RequestType::CACHE_REMOVE_KEY> req(id, binary, key); + BoolResponse rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + + return rsp.GetValue(); + } + + void CacheClientImpl::RemoveAll() + { + CacheRequest<RequestType::CACHE_REMOVE_ALL> req(id, binary); + Response rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + } + + void CacheClientImpl::Clear(const WritableKey& key) + { + CacheKeyRequest<RequestType::CACHE_CLEAR_KEY> req(id, binary, key); + Response rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + } + + void CacheClientImpl::Clear() + { + CacheRequest<RequestType::CACHE_CLEAR> req(id, binary); + Response rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + } + + void CacheClientImpl::LocalPeek(const WritableKey& key, Readable& value) + { + CacheKeyRequest<RequestType::CACHE_LOCAL_PEEK> req(id, binary, key); + CacheGetResponse rsp(value); + + SyncCacheKeyMessage(key, req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + } + + void CacheClientImpl::RefreshAffinityMapping() + { + router.Get()->RefreshAffinityMapping(id, binary); + } + } + } + } +} +
http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h new file mode 100644 index 0000000..a28c4dd --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_impl.h @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_CACHE_CACHE_CLIENT_IMPL +#define _IGNITE_IMPL_THIN_CACHE_CACHE_CLIENT_IMPL + +#include <stdint.h> +#include <string> + +#include "impl/data_router.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + /* Forward declaration. */ + class Readable; + + /* Forward declaration. */ + class Writable; + + /* Forward declaration. */ + class WritableKey; + + namespace cache + { + /** + * Ignite client class implementation. + * + * This is an entry point for Thin C++ Ignite client. Its main + * purpose is to establish connection to the remote server node. + */ + class CacheClientImpl + { + public: + /** + * Constructor. + * + * @param router Data router instance. + * @param name Cache name. + * @param id Cache ID. + */ + CacheClientImpl( + const SP_DataRouter& router, + const std::string& name, + int32_t id); + + /** + * Destructor. + */ + ~CacheClientImpl(); + + /** + * Put value to cache. + * + * @param key Key. + * @param value Value. + */ + void Put(const WritableKey& key, const Writable& value); + + /** + * Get value from cache. + * + * @param key Key. + * @param value Value. + */ + void Get(const WritableKey& key, Readable& value); + + /** + * Check if the cache contains a value for the specified key. + * + * @param key Key whose presence in this cache is to be tested. + * @return @c true if the cache contains specified key. + */ + bool ContainsKey(const WritableKey& key); + + /** + * Gets the number of all entries cached across all nodes. + * @note This operation is distributed and will query all + * participating nodes for their cache sizes. + * + * @param peekModes Peek modes mask. + * @return Cache size across all nodes. + */ + int64_t GetSize(int32_t peekModes); + + /** + * Removes given key mapping from cache. If cache previously contained value for the given key, + * then this value is returned. In case of PARTITIONED or REPLICATED caches, the value will be + * loaded from the primary node, which in its turn may load the value from the disk-based swap + * storage, and consecutively, if it's not in swap, from the underlying persistent storage. + * If the returned value is not needed, method removex() should always be used instead of this + * one to avoid the overhead associated with returning of the previous value. + * If write-through is enabled, the value will be removed from store. + * This method is transactional and will enlist the entry into ongoing transaction if there is one. + * + * @param key Key whose mapping is to be removed from cache. + * @return False if there was no matching key. + */ + bool Remove(const WritableKey& key); + + /** + * Removes all mappings from cache. + * If write-through is enabled, the value will be removed from store. + * This method is transactional and will enlist the entry into ongoing transaction if there is one. + */ + void RemoveAll(); + + /** + * Clear entry from the cache and swap storage, without notifying listeners or CacheWriters. + * Entry is cleared only if it is not currently locked, and is not participating in a transaction. + * + * @param key Key to clear. + */ + void Clear(const WritableKey& key); + + /** + * Clear cache. + */ + void Clear(); + + /** + * Peeks at in-memory cached value using default optional + * peek mode. This method will not load value from any + * persistent store or from a remote node. + * + * Use for testing purposes only. + * + * @param key Key whose presence in this cache is to be tested. + * @param value Value. + */ + void LocalPeek(const WritableKey& key, Readable& value); + + /** + * Update cache partitions info. + */ + void RefreshAffinityMapping(); + + private: + /** + * Synchronously send request message and receive response. + * + * @param key Key. + * @param req Request message. + * @param rsp Response message. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void SyncCacheKeyMessage(const WritableKey& key, const ReqT& req, RspT& rsp); + + /** Data router. */ + SP_DataRouter router; + + /** Cache name. */ + std::string name; + + /** Cache ID. */ + int32_t id; + + /** Binary flag. */ + bool binary; + }; + + typedef common::concurrent::SharedPointer<CacheClientImpl> SP_CacheClientImpl; + } + } + } +} +#endif // _IGNITE_IMPL_THIN_CACHE_CACHE_CLIENT_IMPL http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp new file mode 100644 index 0000000..1fc8ac9 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/cache/cache_client_proxy.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/impl/thin/cache/cache_client_proxy.h> + +#include "impl/cache/cache_client_impl.h" + +using namespace ignite::impl::thin; +using namespace cache; + +namespace +{ + using namespace ignite::common::concurrent; + + CacheClientImpl& GetCacheImpl(SharedPointer<void>& ptr) + { + return *reinterpret_cast<CacheClientImpl*>(ptr.Get()); + } + + const CacheClientImpl& GetCacheImpl(const SharedPointer<void>& ptr) + { + return *reinterpret_cast<const CacheClientImpl*>(ptr.Get()); + } +} + +namespace ignite +{ + namespace impl + { + namespace thin + { + namespace cache + { + void CacheClientProxy::Put(const WritableKey& key, const Writable& value) + { + GetCacheImpl(impl).Put(key, value); + } + + void CacheClientProxy::Get(const WritableKey& key, Readable& value) + { + GetCacheImpl(impl).Get(key, value); + } + + bool CacheClientProxy::ContainsKey(const WritableKey & key) + { + return GetCacheImpl(impl).ContainsKey(key); + } + + int64_t CacheClientProxy::GetSize(int32_t peekModes) + { + return GetCacheImpl(impl).GetSize(peekModes); + } + + void CacheClientProxy::LocalPeek(const WritableKey& key, Readable& value) + { + GetCacheImpl(impl).LocalPeek(key, value); + } + + void CacheClientProxy::RefreshAffinityMapping() + { + GetCacheImpl(impl).RefreshAffinityMapping(); + } + + bool CacheClientProxy::Remove(const WritableKey& key) + { + return GetCacheImpl(impl).Remove(key); + } + + void CacheClientProxy::RemoveAll() + { + GetCacheImpl(impl).RemoveAll(); + } + + void CacheClientProxy::Clear(const WritableKey& key) + { + GetCacheImpl(impl).Clear(key); + } + + void CacheClientProxy::Clear() + { + GetCacheImpl(impl).Clear(); + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/connectable_node_partitions.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/connectable_node_partitions.h b/modules/platforms/cpp/thin-client/src/impl/connectable_node_partitions.h new file mode 100644 index 0000000..a6bbb86 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/connectable_node_partitions.h @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_CONNECTABLE_NODE_PARTITIONS +#define _IGNITE_IMPL_THIN_CONNECTABLE_NODE_PARTITIONS + +#include <stdint.h> + +#include "impl/net/end_point.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + /** + * Address of the node, connectible for the thin client, associated + * with cache partitions info. + */ + class ConnectableNodePartitions + { + public: + /** + * Default constructor. + */ + ConnectableNodePartitions() : + endPoints(), + partitions() + { + // No-op. + } + + /** + * Destructor. + */ + ~ConnectableNodePartitions() + { + // No-op. + } + + /** + * Get end points. + * + * @return End points. + */ + const std::vector<net::EndPoint>& GetEndPoints() const + { + return endPoints; + } + + /** + * Get cache partitions for this node. + * + * @return Cache partitions. + */ + const std::vector<int32_t>& GetPartitions() const + { + return partitions; + } + + /** + * Read from data stream, using provided reader. + * + * @param reader Reader. + */ + void Read(binary::BinaryReaderImpl& reader) + { + int32_t port = reader.ReadInt32(); + + int32_t addrNum = reader.ReadInt32(); + + endPoints.clear(); + endPoints.reserve(addrNum); + + for (int32_t i = 0; i < addrNum; ++i) + { + std::string addr; + reader.ReadString(addr); + + endPoints.push_back(net::EndPoint(addr, static_cast<uint16_t>(port))); + } + + int32_t partsNum = reader.ReadInt32(); + + partitions.clear(); + partitions.reserve(addrNum); + + for (int32_t i = 0; i < partsNum; ++i) + partitions.push_back(reader.ReadInt32()); + } + + private: + /** Node end points. */ + std::vector<net::EndPoint> endPoints; + + /** Cache partitions. */ + std::vector<int32_t> partitions; + }; + } + } +} + +#endif //_IGNITE_IMPL_THIN_CONNECTABLE_NODE_PARTITIONS \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp new file mode 100644 index 0000000..e1fffc5 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.cpp @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cstring> +#include <cstddef> + +#include <sstream> + +#include <ignite/common/fixed_size_array.h> + +#include "impl/message.h" +#include "impl/ssl/ssl_gateway.h" +#include "impl/ssl/secure_socket_client.h" +#include "impl/net/tcp_socket_client.h" +#include "impl/net/remote_type_updater.h" + +#include "impl/data_channel.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + /** Version 1.1.0. */ + const ProtocolVersion DataChannel::VERSION_1_1_0(1, 1, 0); + + /** Current version. */ + const ProtocolVersion DataChannel::VERSION_CURRENT(VERSION_1_1_0); + + DataChannel::VersionSet::value_type supportedArray[] = { + DataChannel::VERSION_1_1_0, + }; + + const DataChannel::VersionSet DataChannel::supportedVersions(supportedArray, + supportedArray + (sizeof(supportedArray) / sizeof(supportedArray[0]))); + + DataChannel::DataChannel(const ignite::thin::IgniteClientConfiguration& cfg, + binary::BinaryTypeManager& typeMgr) : + ioMutex(), + address(), + config(cfg), + typeMgr(typeMgr), + currentVersion(VERSION_CURRENT), + reqIdCounter(0), + socket() + { + // No-op. + } + + DataChannel::~DataChannel() + { + // No-op. + } + + bool DataChannel::Connect(const std::string& host, uint16_t port, int32_t timeout) + { + using ignite::thin::SslMode; + + if (socket.get() != 0) + throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Already connected."); + + if (config.GetEndPoints().empty()) + throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_ARGUMENT, "No valid address to connect."); + + SslMode::Type sslMode = config.GetSslMode(); + + if (sslMode != SslMode::DISABLE) + { + ssl::SslGateway::GetInstance().LoadAll(); + + socket.reset(new ssl::SecureSocketClient(config.GetSslCertFile(), + config.GetSslKeyFile(), config.GetSslCaFile())); + } + else + socket.reset(new net::TcpSocketClient()); + + address.host = host; + address.port = port; + + return TryRestoreConnection(timeout); + } + + void DataChannel::Close() + { + if (socket.get() != 0) + { + socket->Close(); + + socket.reset(); + } + } + + void DataChannel::InternalSyncMessage(interop::InteropUnpooledMemory& mem, int32_t timeout) + { + common::concurrent::CsLockGuard lock(ioMutex); + + bool success = Send(mem.Data(), mem.Length(), timeout); + + if (!success) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Can not send message to remote host: timeout"); + + success = Receive(mem, timeout); + + if (!success) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Can not send message to remote host: timeout"); + } + + bool DataChannel::Send(const int8_t* data, size_t len, int32_t timeout) + { + if (socket.get() == 0) + throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "Connection is not established"); + + OperationResult::T res = SendAll(data, len, timeout); + + if (res == OperationResult::TIMEOUT) + return false; + + if (res == OperationResult::FAIL) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Can not send message due to connection failure"); + + return true; + } + + DataChannel::OperationResult::T DataChannel::SendAll(const int8_t* data, size_t len, int32_t timeout) + { + int sent = 0; + + while (sent != static_cast<int64_t>(len)) + { + int res = socket->Send(data + sent, len - sent, timeout); + + if (res < 0 || res == SocketClient::WaitResult::TIMEOUT) + { + Close(); + + return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT; + } + + sent += res; + } + + assert(static_cast<size_t>(sent) == len); + + return OperationResult::SUCCESS; + } + + bool DataChannel::Receive(interop::InteropMemory& msg, int32_t timeout) + { + assert(msg.Capacity() > 4); + + if (socket.get() == 0) + throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_STATE, "DataChannel is not established"); + + // Message size + msg.Length(4); + + OperationResult::T res = ReceiveAll(msg.Data(), static_cast<size_t>(msg.Length()), timeout); + + if (res == OperationResult::TIMEOUT) + return false; + + if (res == OperationResult::FAIL) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Can not receive message header"); + + interop::InteropInputStream inStream(&msg); + + int32_t msgLen = inStream.ReadInt32(); + + if (msgLen < 0) + { + Close(); + + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Protocol error: Message length is negative"); + } + + if (msgLen == 0) + return true; + + if (msg.Capacity() < msgLen + 4) + msg.Reallocate(msgLen + 4); + + msg.Length(4 + msgLen); + + res = ReceiveAll(msg.Data() + 4, msgLen, timeout); + + if (res == OperationResult::TIMEOUT) + return false; + + if (res == OperationResult::FAIL) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Connection failure: Can not receive message body"); + + return true; + } + + DataChannel::OperationResult::T DataChannel::ReceiveAll(void* dst, size_t len, int32_t timeout) + { + size_t remain = len; + int8_t* buffer = reinterpret_cast<int8_t*>(dst); + + while (remain) + { + size_t received = len - remain; + + int res = socket->Receive(buffer + received, remain, timeout); + + if (res < 0 || res == SocketClient::WaitResult::TIMEOUT) + { + Close(); + + return res < 0 ? OperationResult::FAIL : OperationResult::TIMEOUT; + } + + remain -= static_cast<size_t>(res); + } + + return OperationResult::SUCCESS; + } + + bool DataChannel::MakeRequestHandshake(const ProtocolVersion& propVer, + ProtocolVersion& resVer, int32_t timeout) + { + currentVersion = propVer; + + resVer = ProtocolVersion(); + bool accepted = false; + + try + { + // Workaround for some Linux systems that report connection on non-blocking + // sockets as successful but fail to establish real connection. + accepted = Handshake(propVer, resVer, timeout); + } + catch (const IgniteError&) + { + return false; + } + + if (!accepted) + return false; + + resVer = propVer; + + return true; + } + + bool DataChannel::Handshake(const ProtocolVersion& propVer, ProtocolVersion& resVer, int32_t timeout) + { + // Allocating 4KB just in case. + enum { BUFFER_SIZE = 1024 * 4 }; + + common::concurrent::CsLockGuard lock(ioMutex); + + interop::InteropUnpooledMemory mem(BUFFER_SIZE); + interop::InteropOutputStream outStream(&mem); + binary::BinaryWriterImpl writer(&outStream, 0); + + int32_t lenPos = outStream.Reserve(4); + writer.WriteInt8(RequestType::HANDSHAKE); + + writer.WriteInt16(propVer.GetMajor()); + writer.WriteInt16(propVer.GetMinor()); + writer.WriteInt16(propVer.GetMaintenance()); + + writer.WriteInt8(ClientType::THIN_CLIENT); + + writer.WriteString(config.GetUser()); + writer.WriteString(config.GetPassword()); + + outStream.WriteInt32(lenPos, outStream.Position() - 4); + + bool success = Send(mem.Data(), outStream.Position(), timeout); + + if (!success) + return false; + + success = Receive(mem, timeout); + + if (!success) + return false; + + interop::InteropInputStream inStream(&mem); + + inStream.Position(4); + + binary::BinaryReaderImpl reader(&inStream); + + bool accepted = reader.ReadBool(); + + if (!accepted) + { + int16_t major = reader.ReadInt16(); + int16_t minor = reader.ReadInt16(); + int16_t maintenance = reader.ReadInt16(); + + resVer = ProtocolVersion(major, minor, maintenance); + + std::string error; + reader.ReadString(error); + + reader.ReadInt32(); + + return false; + } + + return true; + } + + bool DataChannel::EnsureConnected(int32_t timeout) + { + if (socket.get() != 0) + return false; + + return TryRestoreConnection(timeout); + } + + bool DataChannel::NegotiateProtocolVersion(int32_t timeout) + { + ProtocolVersion resVer; + ProtocolVersion propVer = VERSION_CURRENT; + + bool success = MakeRequestHandshake(propVer, resVer, timeout); + + while (!success) + { + if (resVer == propVer || !IsVersionSupported(resVer)) + return false; + + propVer = resVer; + + success = MakeRequestHandshake(propVer, resVer, timeout); + } + + return true; + } + + bool DataChannel::TryRestoreConnection(int32_t timeout) + { + bool connected = false; + + connected = socket->Connect(address.host.c_str(), address.port, timeout); + + if (!connected) + { + Close(); + + return false; + } + + connected = NegotiateProtocolVersion(timeout); + + if (!connected) + { + Close(); + + return false; + } + + return true; + } + + bool DataChannel::IsVersionSupported(const ProtocolVersion& ver) + { + return supportedVersions.find(ver) != supportedVersions.end(); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/data_channel.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/data_channel.h b/modules/platforms/cpp/thin-client/src/impl/data_channel.h new file mode 100644 index 0000000..20e7628 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/data_channel.h @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_DATA_CHANNEL +#define _IGNITE_IMPL_THIN_DATA_CHANNEL + +#include <stdint.h> + +#include <memory> + +#include <ignite/thin/ignite_client_configuration.h> + +#include <ignite/common/concurrent.h> + +#include <ignite/impl/interop/interop_output_stream.h> +#include <ignite/impl/binary/binary_writer_impl.h> + +#include "impl/protocol_version.h" +#include "impl/net/end_point.h" +#include "impl/socket_client.h" + +namespace ignite +{ + namespace impl + { + namespace interop + { + // Forward declaration. + class InteropMemory; + } + + namespace thin + { + /** + * Data router. + * + * Ensures there is a connection between client and one of the servers + * and routes data between them. + */ + class DataChannel + { + public: + /** Version set type. */ + typedef std::set<ProtocolVersion> VersionSet; + + /** Version 1.1.0. */ + static const ProtocolVersion VERSION_1_1_0; + + /** Current version. */ + static const ProtocolVersion VERSION_CURRENT; + + /** + * Operation with timeout result. + */ + struct OperationResult + { + enum T + { + SUCCESS, + FAIL, + TIMEOUT + }; + }; + + /** + * Constructor. + * + * @param cfg Configuration. + * @param typeMgr Type manager. + */ + DataChannel(const ignite::thin::IgniteClientConfiguration& cfg, binary::BinaryTypeManager& typeMgr); + + /** + * Destructor. + */ + ~DataChannel(); + + /** + * Establish connection to cluster. + * + * @param host Host. + * @param port Port. + * @param timeout Timeout. + * @return @c true on success. + */ + bool Connect(const std::string& host, uint16_t port, int32_t timeout); + + /** + * Close connection. + */ + void Close(); + + /** + * Synchronously send request message and receive response. + * Uses provided timeout. Does not try to restore connection on + * fail. + * + * @param req Request message. + * @param rsp Response message. + * @param timeout Timeout. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void SyncMessage(const ReqT& req, RspT& rsp, int32_t timeout) + { + // Allocating 64KB to lessen number of reallocations. + enum { BUFFER_SIZE = 1024 * 64 }; + + interop::InteropUnpooledMemory mem(BUFFER_SIZE); + + int64_t id = GenerateRequestMessage(req, mem); + + InternalSyncMessage(mem, timeout); + + interop::InteropInputStream inStream(&mem); + + inStream.Position(4); + + int64_t rspId = inStream.ReadInt64(); + + if (id != rspId) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, + "Protocol error: Response message ID does not equal Request ID"); + + binary::BinaryReaderImpl reader(&inStream); + + rsp.Read(reader, currentVersion); + } + + /** + * Send message stored in memory and synchronously receives + * response and stores it in the same memory. + * + * @param mem Memory. + * @param timeout Opration timeout. + */ + void InternalSyncMessage(interop::InteropUnpooledMemory& mem, int32_t timeout); + + /** + * Get address. + * @return Address. + */ + const net::EndPoint& GetAddress() const + { + return address; + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(DataChannel); + + /** + * Generate request ID. + * + * Atomicaly generates and returns new Request ID. + * + * @return Unique Request ID. + */ + int64_t GenerateRequestId() + { + return common::concurrent::Atomics::IncrementAndGet64(&reqIdCounter); + } + + /** + * Generate message to send. + * + * @param req Request to serialize. + * @param mem Memory to write request to. + * @return Message ID. + */ + template<typename ReqT> + int64_t GenerateRequestMessage(const ReqT& req, interop::InteropUnpooledMemory& mem) + { + interop::InteropOutputStream outStream(&mem); + binary::BinaryWriterImpl writer(&outStream, &typeMgr); + + // Space for RequestSize + OperationCode + RequestID. + outStream.Reserve(4 + 2 + 8); + + req.Write(writer, currentVersion); + + int64_t id = GenerateRequestId(); + + outStream.WriteInt32(0, outStream.Position() - 4); + outStream.WriteInt16(4, ReqT::GetOperationCode()); + outStream.WriteInt64(6, id); + + outStream.Synchronize(); + + return id; + } + + /** + * Send data by established connection. + * + * @param data Data buffer. + * @param len Data length. + * @param timeout Timeout. + * @return @c true on success, @c false on timeout. + * @throw IgniteError on error. + */ + bool Send(const int8_t* data, size_t len, int32_t timeout); + + /** + * Receive next message. + * + * @param msg Buffer for message. + * @param timeout Timeout. + * @return @c true on success, @c false on timeout. + * @throw IgniteError on error. + */ + bool Receive(interop::InteropMemory& msg, int32_t timeout); + + /** + * Receive specified number of bytes. + * + * @param dst Buffer for data. + * @param len Number of bytes to receive. + * @param timeout Timeout. + * @return Operation result. + */ + OperationResult::T ReceiveAll(void* dst, size_t len, int32_t timeout); + + /** + * Send specified number of bytes. + * + * @param data Data buffer. + * @param len Data length. + * @param timeout Timeout. + * @return Operation result. + */ + OperationResult::T SendAll(const int8_t* data, size_t len, int32_t timeout); + + /** + * Perform handshake request. + * + * @param propVer Proposed protocol version. + * @param resVer Resulted version. + * @param timeout Timeout. + * @return @c true on success and @c false otherwise. + */ + bool MakeRequestHandshake(const ProtocolVersion& propVer, ProtocolVersion& resVer, int32_t timeout); + + /** + * Synchronously send handshake request message and receive + * handshake response. Uses provided timeout. Does not try to + * restore connection on fail. + * + * @param propVer Proposed protocol version. + * @param resVer Resulted version. + * @param timeout Timeout. + * @return @c true if accepted. + * @throw IgniteError on error. + */ + bool Handshake(const ProtocolVersion& propVer, ProtocolVersion& resVer, int32_t timeout); + + /** + * Ensure there is a connection to the cluster. + * + * @param timeout Timeout. + * @return @c false on error. + */ + bool EnsureConnected(int32_t timeout); + + /** + * Negotiate protocol version with current host + * + * @param timeout Timeout. + * @return @c true on success and @c false otherwise. + */ + bool NegotiateProtocolVersion(int32_t timeout); + + /** + * Try to restore connection to the cluster. + * + * @param timeout Timeout. + * @return @c true on success and @c false otherwise. + */ + bool TryRestoreConnection(int32_t timeout); + + /** + * Check if the version is supported. + * + * @param ver Version. + * @return True if the version is supported. + */ + static bool IsVersionSupported(const ProtocolVersion& ver); + + /** Set of supported versions. */ + const static VersionSet supportedVersions; + + /** Sync IO mutex. */ + common::concurrent::CriticalSection ioMutex; + + /** Remote host address. */ + net::EndPoint address; + + /** Configuration. */ + const ignite::thin::IgniteClientConfiguration& config; + + /** Metadata manager. */ + binary::BinaryTypeManager& typeMgr; + + /** Protocol version. */ + ProtocolVersion currentVersion; + + /** Request ID counter. */ + int64_t reqIdCounter; + + /** Client Socket. */ + std::auto_ptr<SocketClient> socket; + }; + + /** Shared pointer type. */ + typedef common::concurrent::SharedPointer<DataChannel> SP_DataChannel; + } + } +} + +#endif //_IGNITE_IMPL_THIN_DATA_CHANNEL http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/data_router.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.cpp b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp new file mode 100644 index 0000000..018e6df --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/data_router.cpp @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cstring> +#include <cstddef> +#include <cstdlib> + +#include <sstream> +#include <iterator> +#include <algorithm> + +#include "impl/utility.h" +#include "impl/data_router.h" +#include "impl/message.h" +#include "impl/response_status.h" +#include "impl/ssl/ssl_gateway.h" +#include "impl/net/remote_type_updater.h" +#include "impl/net/net_utils.h" +#include "ignite/impl/thin/writable_key.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + DataRouter::DataRouter(const ignite::thin::IgniteClientConfiguration& cfg) : + ioTimeout(DEFALT_IO_TIMEOUT), + connectionTimeout(DEFALT_CONNECT_TIMEOUT), + config(cfg), + ranges(), + localAddresses(), + typeUpdater(), + typeMgr() + { + srand(common::GetRandSeed()); + + typeUpdater.reset(new net::RemoteTypeUpdater(*this)); + + typeMgr.SetUpdater(typeUpdater.get()); + + CollectAddresses(config.GetEndPoints(), ranges); + } + + DataRouter::~DataRouter() + { + // No-op. + } + + void DataRouter::Connect() + { + using ignite::thin::SslMode; + + UpdateLocalAddresses(); + + channels.clear(); + + if (config.GetEndPoints().empty()) + throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_ARGUMENT, "No valid address to connect."); + + for (std::vector<net::TcpRange>::iterator it = ranges.begin(); it != ranges.end(); ++it) + { + net::TcpRange& range = *it; + + for (uint16_t port = range.port; port <= range.port + range.range; ++port) + { + SP_DataChannel channel(new DataChannel(config, typeMgr)); + + bool connected = channel.Get()->Connect(range.host, port, connectionTimeout); + + if (connected) + { + common::concurrent::CsLockGuard lock(channelsMutex); + + channels[channel.Get()->GetAddress()].Swap(channel); + + break; + } + } + + if (!channels.empty()) + break; + } + + if (channels.empty()) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, "Failed to establish connection with any host."); + } + + void DataRouter::Close() + { + typeMgr.SetUpdater(0); + + std::map<net::EndPoint, SP_DataChannel>::iterator it; + + common::concurrent::CsLockGuard lock(channelsMutex); + + for (it = channels.begin(); it != channels.end(); ++it) + { + DataChannel* channel = it->second.Get(); + + if (channel) + channel->Close(); + } + } + + void DataRouter::RefreshAffinityMapping(int32_t cacheId, bool binary) + { + std::vector<ConnectableNodePartitions> nodeParts; + + CacheRequest<RequestType::CACHE_NODE_PARTITIONS> req(cacheId, binary); + ClientCacheNodePartitionsResponse rsp(nodeParts); + + SyncMessageNoMetaUpdate(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_CACHE, rsp.GetError().c_str()); + + cache::SP_CacheAffinityInfo newMapping(new cache::CacheAffinityInfo(nodeParts)); + + common::concurrent::CsLockGuard lock(cacheAffinityMappingMutex); + + cache::SP_CacheAffinityInfo& affinityInfo = cacheAffinityMapping[cacheId]; + affinityInfo.Swap(newMapping); + } + + cache::SP_CacheAffinityInfo DataRouter::GetAffinityMapping(int32_t cacheId) + { + common::concurrent::CsLockGuard lock(cacheAffinityMappingMutex); + + return cacheAffinityMapping[cacheId]; + } + + void DataRouter::ReleaseAffinityMapping(int32_t cacheId) + { + common::concurrent::CsLockGuard lock(cacheAffinityMappingMutex); + + cacheAffinityMapping.erase(cacheId); + } + + SP_DataChannel DataRouter::GetRandomChannel() + { + int r = rand(); + + common::concurrent::CsLockGuard lock(channelsMutex); + + size_t idx = r % channels.size(); + + std::map<net::EndPoint, SP_DataChannel>::iterator it = channels.begin(); + + std::advance(it, idx); + + return it->second; + } + + bool DataRouter::IsLocalHost(const std::vector<net::EndPoint>& hint) + { + for (std::vector<net::EndPoint>::const_iterator it = hint.begin(); it != hint.end(); ++it) + { + const std::string& host = it->host; + + if (IsLocalAddress(host)) + continue; + + if (localAddresses.find(host) == localAddresses.end()) + return false; + } + + return true; + } + + bool DataRouter::IsLocalAddress(const std::string& host) + { + static const std::string s127("127"); + + bool ipv4 = std::count(host.begin(), host.end(), '.') == 3; + + if (ipv4) + return host.compare(0, 3, s127) == 0; + + return host == "::1" || host == "0:0:0:0:0:0:0:1"; + } + + bool DataRouter::IsProvidedByUser(const net::EndPoint& endPoint) + { + for (std::vector<net::TcpRange>::iterator it = ranges.begin(); it != ranges.end(); ++it) + { + if (it->host == endPoint.host && + endPoint.port >= it->port && + endPoint.port <= it->port + it->range) + return true; + } + + return false; + } + + SP_DataChannel DataRouter::GetBestChannel(const std::vector<net::EndPoint>& hint) + { + if (hint.empty()) + return GetRandomChannel(); + + bool localHost = IsLocalHost(hint); + + for (std::vector<net::EndPoint>::const_iterator it = hint.begin(); it != hint.end(); ++it) + { + if (IsLocalAddress(it->host) && !localHost) + continue; + + if (!IsProvidedByUser(*it)) + continue; + + common::concurrent::CsLockGuard lock(channelsMutex); + + SP_DataChannel& dst = channels[*it]; + + if (dst.IsValid()) + return dst; + + SP_DataChannel channel(new DataChannel(config, typeMgr)); + + bool connected = channel.Get()->Connect(it->host, it->port, connectionTimeout); + + if (connected) + { + dst.Swap(channel); + + return dst; + } + } + + return GetRandomChannel(); + } + + void DataRouter::UpdateLocalAddresses() + { + localAddresses.clear(); + + net::net_utils::GetLocalAddresses(localAddresses); + } + + void DataRouter::CollectAddresses(const std::string& str, std::vector<net::TcpRange>& ranges) + { + ranges.clear(); + + utility::ParseAddress(str, ranges, DEFAULT_PORT); + + std::random_shuffle(ranges.begin(), ranges.end()); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/data_router.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/data_router.h b/modules/platforms/cpp/thin-client/src/impl/data_router.h new file mode 100644 index 0000000..c3ca62d --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/data_router.h @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_DATA_ROUTER +#define _IGNITE_IMPL_THIN_DATA_ROUTER + +#include <stdint.h> + +#include <map> +#include <set> +#include <vector> +#include <memory> +#include <string> + +#include <ignite/thin/ignite_client_configuration.h> + +#include <ignite/common/concurrent.h> +#include <ignite/impl/binary/binary_writer_impl.h> + +#include "impl/data_channel.h" +#include "impl/net/end_point.h" +#include "impl/net/tcp_range.h" +#include "impl/cache/cache_affinity_info.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + // Forward declaration. + class WritableKey; + + /** + * Data router. + * + * Ensures there is a connection between client and one of the servers + * and routes data between them. + */ + class DataRouter + { + public: + /** Connection establishment timeout in seconds. */ + enum { DEFALT_CONNECT_TIMEOUT = 5 }; + + /** Network IO operation timeout in seconds. */ + enum { DEFALT_IO_TIMEOUT = 5 }; + + /** Default port. */ + enum { DEFAULT_PORT = 10800 }; + + /** + * Constructor. + * + * @param cfg Configuration. + */ + DataRouter(const ignite::thin::IgniteClientConfiguration& cfg); + + /** + * Destructor. + */ + ~DataRouter(); + + /** + * Establish connection to cluster. + */ + void Connect(); + + /** + * Close connection. + */ + void Close(); + + /** + * Synchronously send request message and receive response. + * Uses provided timeout. + * + * @param req Request message. + * @param rsp Response message. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void SyncMessage(const ReqT& req, RspT& rsp) + { + SP_DataChannel channel = GetRandomChannel(); + + int32_t metaVer = typeMgr.GetVersion(); + + channel.Get()->SyncMessage(req, rsp, ioTimeout); + + if (typeMgr.IsUpdatedSince(metaVer)) + { + IgniteError err; + + if (!typeMgr.ProcessPendingUpdates(err)) + throw err; + } + } + + /** + * Synchronously send request message and receive response. + * + * @param req Request message. + * @param rsp Response message. + * @param hint End points of the preferred server node to use. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void SyncMessage(const ReqT& req, RspT& rsp, const std::vector<net::EndPoint>& hint) + { + SP_DataChannel channel = GetBestChannel(hint); + + int32_t metaVer = typeMgr.GetVersion(); + + channel.Get()->SyncMessage(req, rsp, ioTimeout); + + if (typeMgr.IsUpdatedSince(metaVer)) + { + IgniteError err; + + if (!typeMgr.ProcessPendingUpdates(err)) + throw err; + } + } + + /** + * Synchronously send request message and receive response. + * Does not update metadata. + * Uses provided timeout. + * + * @param req Request message. + * @param rsp Response message. + * @throw IgniteError on error. + */ + template<typename ReqT, typename RspT> + void SyncMessageNoMetaUpdate(const ReqT& req, RspT& rsp) + { + SP_DataChannel channel = GetRandomChannel(); + + channel.Get()->SyncMessage(req, rsp, ioTimeout); + } + + /** + * Update affinity mapping for the cache. + * + * @param cacheId Cache ID. + * @param binary Cache binary flag. + */ + void RefreshAffinityMapping(int32_t cacheId, bool binary); + + /** + * Get affinity mapping for the cache. + * + * @param cacheId Cache ID. + * @return Mapping. + */ + cache::SP_CacheAffinityInfo GetAffinityMapping(int32_t cacheId); + + /** + * Clear affinity mapping for the cache. + * + * @param cacheId Cache ID. + */ + void ReleaseAffinityMapping(int32_t cacheId); + + private: + IGNITE_NO_COPY_ASSIGNMENT(DataRouter); + + /** End point collection. */ + typedef std::vector<net::EndPoint> EndPoints; + + /** Shared pointer to end points. */ + typedef common::concurrent::SharedPointer<EndPoints> SP_EndPoints; + + /** + * Get endpoints for the key. + * Always using Rendezvous Affinity Function algorithm for now. + * + * @param cacheId Cache ID. + * @param key Key. + * @return Endpoints for the key. + */ + int32_t GetPartitionForKey(int32_t cacheId, const WritableKey& key); + + /** + * Get random data channel. + * + * @return Random data channel. + */ + SP_DataChannel GetRandomChannel(); + + /** + * Check whether the provided address hint is the local host. + * + * @param hint Hint. + * @return @c true if the local host. + */ + bool IsLocalHost(const std::vector<net::EndPoint>& hint); + + /** + * Check whether the provided address is the local host. + * + * @param host Host. + * @return @c true if the local host. + */ + static bool IsLocalAddress(const std::string& host); + + /** + * Check whether the provided end point is provided by user using configuration. + * + * @param endPoint End point to check. + * @return @c true if provided by user using configuration. + */ + bool IsProvidedByUser(const net::EndPoint& endPoint); + + /** + * Get the best data channel. + * + * @param hint End points of the preferred server node to use. + * @return The best available data channel. + */ + SP_DataChannel GetBestChannel(const std::vector<net::EndPoint>& hint); + + /** + * Update local addresses. + */ + void UpdateLocalAddresses(); + + /** + * Collect all addresses from string. + * + * @param str String with connection strings to parse. + * @param ranges Address ranges. + */ + static void CollectAddresses(const std::string& str, std::vector<net::TcpRange>& ranges); + + /** IO timeout in seconds. */ + int32_t ioTimeout; + + /** Connection timeout in seconds. */ + int32_t connectionTimeout; + + /** Configuration. */ + ignite::thin::IgniteClientConfiguration config; + + /** Address ranges. */ + std::vector<net::TcpRange> ranges; + + /** Local addresses. */ + std::set<std::string> localAddresses; + + /** Type updater. */ + std::auto_ptr<binary::BinaryTypeUpdater> typeUpdater; + + /** Metadata manager. */ + binary::BinaryTypeManager typeMgr; + + /** Data channels. */ + std::map<net::EndPoint, SP_DataChannel> channels; + + /** Channels mutex. */ + common::concurrent::CriticalSection channelsMutex; + + /** Cache affinity mapping. */ + std::map<int32_t, cache::SP_CacheAffinityInfo> cacheAffinityMapping; + + /** Cache affinity mapping mutex. */ + common::concurrent::CriticalSection cacheAffinityMappingMutex; + }; + + /** Shared pointer type. */ + typedef common::concurrent::SharedPointer<DataRouter> SP_DataRouter; + } + } +} + +#endif //_IGNITE_IMPL_THIN_DATA_ROUTER http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp new file mode 100644 index 0000000..b539cd7 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.cpp @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "impl/utility.h" +#include "impl/cache/cache_client_impl.h" +#include "impl/message.h" +#include "impl/response_status.h" + +#include "impl/ignite_client_impl.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + IgniteClientImpl::IgniteClientImpl(const ignite::thin::IgniteClientConfiguration& cfg) : + cfg(cfg), + router(new DataRouter(cfg)) + { + // No-op. + } + + IgniteClientImpl::~IgniteClientImpl() + { + // No-op. + } + + void IgniteClientImpl::Start() + { + router.Get()->Connect(); + } + + cache::SP_CacheClientImpl IgniteClientImpl::GetCache(const char* name) const + { + CheckCacheName(name); + + int32_t cacheId = utility::GetCacheId(name); + + return MakeCacheImpl(router, name, cacheId); + } + + cache::SP_CacheClientImpl IgniteClientImpl::GetOrCreateCache(const char* name) + { + CheckCacheName(name); + + int32_t cacheId = utility::GetCacheId(name); + + GetOrCreateCacheWithNameRequest req(name); + Response rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, rsp.GetError().c_str()); + + return MakeCacheImpl(router, name, cacheId); + } + + cache::SP_CacheClientImpl IgniteClientImpl::CreateCache(const char* name) + { + CheckCacheName(name); + + int32_t cacheId = utility::GetCacheId(name); + + CreateCacheWithNameRequest req(name); + Response rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, rsp.GetError().c_str()); + + return MakeCacheImpl(router, name, cacheId); + } + + void IgniteClientImpl::DestroyCache(const char* name) + { + CheckCacheName(name); + + int32_t cacheId = utility::GetCacheId(name); + + DestroyCacheRequest req(cacheId); + Response rsp; + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, rsp.GetError().c_str()); + + router.Get()->ReleaseAffinityMapping(cacheId); + } + + void IgniteClientImpl::GetCacheNames(std::vector<std::string>& cacheNames) + { + Request<RequestType::CACHE_GET_NAMES> req; + GetCacheNamesResponse rsp(cacheNames); + + router.Get()->SyncMessage(req, rsp); + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, rsp.GetError().c_str()); + } + + common::concurrent::SharedPointer<cache::CacheClientImpl> IgniteClientImpl::MakeCacheImpl( + const SP_DataRouter& router, + const std::string& name, + int32_t id) + { + cache::SP_CacheClientImpl cache(new cache::CacheClientImpl(router, name, id)); + + cache.Get()->RefreshAffinityMapping(); + + return cache; + } + + void IgniteClientImpl::CheckCacheName(const char* name) + { + if (!name || !strlen(name)) + throw IgniteError(IgniteError::IGNITE_ERR_ILLEGAL_ARGUMENT, "Specified cache name is not allowed"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h new file mode 100644 index 0000000..4ca67fe --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/ignite_client_impl.h @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _IGNITE_IMPL_THIN_IGNITE_CLIENT_IMPL +#define _IGNITE_IMPL_THIN_IGNITE_CLIENT_IMPL + +#include <ignite/thin/ignite_client.h> +#include <ignite/thin/ignite_client_configuration.h> + +#include "impl/data_router.h" +#include "impl/cache/cache_client_impl.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + /** + * Ignite client class implementation. + * + * This is an entry point for Thin C++ Ignite client. Its main + * purpose is to establish connection to the remote server node. + */ + class IgniteClientImpl + { + public: + /** + * Constructor. + * + * @param cfg Configuration. + */ + IgniteClientImpl(const ignite::thin::IgniteClientConfiguration& cfg); + + /** + * Destructor. + */ + ~IgniteClientImpl(); + + /** + * Start client. + * + * @throw IgnitError on inability to connect. + */ + void Start(); + + /** + * Get cache. + * + * @param name Cache name. + * @return Cache. + */ + common::concurrent::SharedPointer<cache::CacheClientImpl> GetCache(const char* name) const; + + /** + * Get or create cache. + * + * @param name Cache name. + * @return Cache. + */ + common::concurrent::SharedPointer<cache::CacheClientImpl> GetOrCreateCache(const char* name); + + /** + * Create cache. + * + * @param name Cache name. + * @return Cache. + */ + common::concurrent::SharedPointer<cache::CacheClientImpl> CreateCache(const char* name); + + /** + * Destroy cache by name. + * + * @param name Cache name. + */ + void DestroyCache(const char* name); + + /** + * Get names of currently available caches or an empty collection + * if no caches are available. + * + * @param cacheNames Cache names. Output parameter. + */ + void GetCacheNames(std::vector<std::string>& cacheNames); + + private: + + /** + * Make cache implementation. + * + * @param router Data router instance. + * @param name Cache name. + * @param id Cache ID. + * @return Cache implementation. + */ + static common::concurrent::SharedPointer<cache::CacheClientImpl> MakeCacheImpl( + const SP_DataRouter& router, + const std::string& name, + int32_t id); + + /** + * Check cache name. + * + * @param name Cache name. + * @throw IgniteError if the name is not valid. + */ + static void CheckCacheName(const char* name); + + /** Configuration. */ + const ignite::thin::IgniteClientConfiguration cfg; + + /** Data router. */ + SP_DataRouter router; + }; + } + } +} +#endif // _IGNITE_IMPL_THIN_IGNITE_CLIENT_IMPL http://git-wip-us.apache.org/repos/asf/ignite/blob/ed658597/modules/platforms/cpp/thin-client/src/impl/message.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/thin-client/src/impl/message.cpp b/modules/platforms/cpp/thin-client/src/impl/message.cpp new file mode 100644 index 0000000..7a99136 --- /dev/null +++ b/modules/platforms/cpp/thin-client/src/impl/message.cpp @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <ignite/binary/binary_raw_reader.h> +#include <ignite/thin/cache/cache_peek_mode.h> + +#include <ignite/impl/thin/writable.h> +#include <ignite/impl/thin/readable.h> + +#include "impl/response_status.h" +#include "impl/message.h" + +namespace ignite +{ + namespace impl + { + namespace thin + { + GetOrCreateCacheWithNameRequest::GetOrCreateCacheWithNameRequest(const std::string& name) : + name(name) + { + // No-op. + } + + void GetOrCreateCacheWithNameRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const + { + writer.WriteString(name); + } + + CreateCacheWithNameRequest::CreateCacheWithNameRequest(const std::string& name) : + name(name) + { + // No-op. + } + + void CreateCacheWithNameRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const + { + writer.WriteString(name); + } + + Response::Response(): + status(ResponseStatus::FAILED) + { + // No-op. + } + + Response::~Response() + { + // No-op. + } + + void Response::Read(binary::BinaryReaderImpl& reader, const ProtocolVersion& ver) + { + status = reader.ReadInt32(); + + if (status == ResponseStatus::SUCCESS) + ReadOnSuccess(reader, ver); + else + reader.ReadString(error); + } + + CachePutRequest::CachePutRequest(int32_t cacheId, bool binary, const Writable& key, const Writable& value) : + CacheKeyRequest<RequestType::CACHE_PUT>(cacheId, binary, key), + value(value) + { + // No-op. + } + + void CachePutRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const + { + CacheKeyRequest<RequestType::CACHE_PUT>::Write(writer, ver); + + value.Write(writer); + } + + ClientCacheNodePartitionsResponse::ClientCacheNodePartitionsResponse( + std::vector<ConnectableNodePartitions>& nodeParts): + nodeParts(nodeParts) + { + // No-op. + } + + ClientCacheNodePartitionsResponse::~ClientCacheNodePartitionsResponse() + { + // No-op. + } + + void ClientCacheNodePartitionsResponse::ReadOnSuccess( + binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + int32_t num = reader.ReadInt32(); + + nodeParts.clear(); + nodeParts.resize(static_cast<size_t>(num)); + + for (int32_t i = 0; i < num; ++i) + nodeParts[i].Read(reader); + } + + CacheGetResponse::CacheGetResponse(Readable& value) : + value(value) + { + // No-op. + } + + CacheGetResponse::~CacheGetResponse() + { + // No-op. + } + + void CacheGetResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + value.Read(reader); + } + + void BinaryTypeGetRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const + { + writer.WriteInt32(typeId); + } + + void BinaryTypePutRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const + { + writer.WriteInt32(snapshot.GetTypeId()); + writer.WriteString(snapshot.GetTypeName()); + + // Affinity Key Field name. + writer.WriteNull(); + + const binary::Snap::FieldMap& fields = snapshot.GetFieldMap(); + + writer.WriteInt32(static_cast<int32_t>(fields.size())); + + for (binary::Snap::FieldMap::const_iterator it = fields.begin(); it != fields.end(); ++it) + { + writer.WriteString(it->first); + writer.WriteInt32(it->second.GetTypeId()); + writer.WriteInt32(it->second.GetFieldId()); + } + + // Is enum: always false for now as we do not support enums. + writer.WriteBool(false); + + // Schemas. Compact schema is not supported for now. + writer.WriteInt32(0); + } + + void BinaryTypeGetResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + int32_t typeId = reader.ReadInt32(); + + std::string typeName; + reader.ReadString(typeName); + + // Unused for now. + std::string affKeyFieldNameUnused; + reader.ReadString(affKeyFieldNameUnused); + + snapshot = binary::SPSnap(new binary::Snap(typeName, typeId)); + + int32_t fieldsNum = reader.ReadInt32(); + + for (int32_t i = 0; i < fieldsNum; ++i) + { + std::string fieldName; + reader.ReadString(fieldName); + + int32_t fieldTypeId = reader.ReadInt32(); + int32_t fieldId = reader.ReadInt32(); + + snapshot.Get()->AddField(fieldId, fieldName, fieldTypeId); + } + + // Check if the type is enum. + bool isEnum = reader.ReadBool(); + + if (isEnum) + throw IgniteError(IgniteError::IGNITE_ERR_BINARY, "Enum types is not supported."); + + // Ignoring schemas for now. + } + + void DestroyCacheRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion&) const + { + writer.WriteInt32(cacheId); + } + + void GetCacheNamesResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + int32_t len = reader.ReadInt32(); + + cacheNames.reserve(static_cast<size_t>(len)); + + for (int32_t i = 0; i < len; i++) + { + std::string res; + reader.ReadString(res); + + cacheNames.push_back(res); + } + } + + void BoolResponse::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + value = reader.ReadBool(); + } + + CacheGetSizeRequest::CacheGetSizeRequest(int32_t cacheId, bool binary, int32_t peekModes) : + CacheRequest<RequestType::CACHE_GET_SIZE>(cacheId, binary), + peekModes(peekModes) + { + // No-op. + } + + void CacheGetSizeRequest::Write(binary::BinaryWriterImpl& writer, const ProtocolVersion& ver) const + { + CacheRequest<RequestType::CACHE_GET_SIZE>::Write(writer, ver); + + if (peekModes & ignite::thin::cache::CachePeekMode::ALL) + { + // Size. + writer.WriteInt32(1); + + writer.WriteInt8(0); + + return; + } + + interop::InteropOutputStream* stream = writer.GetStream(); + + // Reserve size. + int32_t sizePos = stream->Reserve(4); + + if (peekModes & ignite::thin::cache::CachePeekMode::NEAR_CACHE) + stream->WriteInt8(1); + + if (peekModes & ignite::thin::cache::CachePeekMode::PRIMARY) + stream->WriteInt8(2); + + if (peekModes & ignite::thin::cache::CachePeekMode::BACKUP) + stream->WriteInt8(3); + + if (peekModes & ignite::thin::cache::CachePeekMode::ONHEAP) + stream->WriteInt8(4); + + if (peekModes & ignite::thin::cache::CachePeekMode::OFFHEAP) + stream->WriteInt8(5); + + int32_t size = stream->Position() - sizePos - 4; + + stream->WriteInt32(sizePos, size); + + stream->Synchronize(); + } + + void Int64Response::ReadOnSuccess(binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + value = reader.ReadInt64(); + } + } + } +} +
