This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new 70185ee4d GEODE-10300: C++ native client: Allow locator responses
greater than … (#970)
70185ee4d is described below
commit 70185ee4d2a4b34bf83d753a648a84a3f91554e7
Author: Alberto Gomez <[email protected]>
AuthorDate: Thu Aug 18 09:39:47 2022 +0200
GEODE-10300: C++ native client: Allow locator responses greater than …
(#970)
* GEODE-10300: Fix locator response size limit in C++ client
If a response message from the locator to the C++ native client
is longer than 3000 bytes the C++ native client will only
read the first 3000 bytes.
* GEODE-10300: Updated after review
* GEODE-10300: Updated after review
* GEODE-10300: Updated after another review
* GEODE-10300: Updated after some more reviews
* GEODE-10300: Some more changes after review.
* GEODE-10300: Small change after review
* GEODE-10300: Remove unneeded space
---
cppcache/include/geode/DataInput.hpp | 90 ++++++-------
cppcache/src/Connector.hpp | 5 +
cppcache/src/DataInput.cpp | 16 +--
cppcache/src/GetAllServersResponse.cpp | 10 +-
cppcache/src/GetAllServersResponse.hpp | 12 +-
cppcache/src/StreamDataInput.cpp | 81 ++++++++++++
...tAllServersResponse.cpp => StreamDataInput.hpp} | 51 +++++---
cppcache/src/TcpConn.cpp | 6 +-
cppcache/src/TcpConn.hpp | 2 +
cppcache/src/ThinClientLocatorHelper.cpp | 20 +--
cppcache/test/CMakeLists.txt | 1 +
cppcache/test/StreamDataInputTest.cpp | 139 +++++++++++++++++++++
.../mock/ConnectorMock.hpp} | 44 ++++---
13 files changed, 365 insertions(+), 112 deletions(-)
diff --git a/cppcache/include/geode/DataInput.hpp
b/cppcache/include/geode/DataInput.hpp
index 23a771af8..fde6c3aea 100644
--- a/cppcache/include/geode/DataInput.hpp
+++ b/cppcache/include/geode/DataInput.hpp
@@ -73,7 +73,7 @@ class APACHE_GEODE_EXPORT DataInput {
*/
inline bool readBoolean() {
_GEODE_CHECK_BUFFER_SIZE(1);
- return *(m_buf++) == 1 ? true : false;
+ return *(buffer_++) == 1 ? true : false;
}
/**
@@ -89,8 +89,8 @@ class APACHE_GEODE_EXPORT DataInput {
inline void readBytesOnly(uint8_t* buffer, size_t len) {
if (len > 0) {
_GEODE_CHECK_BUFFER_SIZE(len);
- std::memcpy(buffer, m_buf, len);
- m_buf += len;
+ std::memcpy(buffer, buffer_, len);
+ buffer_ += len;
}
}
@@ -107,8 +107,8 @@ class APACHE_GEODE_EXPORT DataInput {
inline void readBytesOnly(int8_t* buffer, size_t len) {
if (len > 0) {
_GEODE_CHECK_BUFFER_SIZE(len);
- std::memcpy(buffer, m_buf, len);
- m_buf += len;
+ std::memcpy(buffer, buffer_, len);
+ buffer_ += len;
}
}
@@ -129,8 +129,8 @@ class APACHE_GEODE_EXPORT DataInput {
if (length > 0) {
_GEODE_CHECK_BUFFER_SIZE(length);
_GEODE_NEW(buffer, uint8_t[length]);
- std::memcpy(buffer, m_buf, length);
- m_buf += length;
+ std::memcpy(buffer, buffer_, length);
+ buffer_ += length;
}
*bytes = buffer;
}
@@ -152,8 +152,8 @@ class APACHE_GEODE_EXPORT DataInput {
if (length > 0) {
_GEODE_CHECK_BUFFER_SIZE(length);
_GEODE_NEW(buffer, int8_t[length]);
- std::memcpy(buffer, m_buf, length);
- m_buf += length;
+ std::memcpy(buffer, buffer_, length);
+ buffer_ += length;
}
*bytes = buffer;
}
@@ -173,10 +173,10 @@ class APACHE_GEODE_EXPORT DataInput {
*/
inline int32_t readInt32() {
_GEODE_CHECK_BUFFER_SIZE(4);
- int32_t tmp = *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
+ int32_t tmp = *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
return tmp;
}
@@ -186,14 +186,14 @@ class APACHE_GEODE_EXPORT DataInput {
inline int64_t readInt64() {
_GEODE_CHECK_BUFFER_SIZE(8);
int64_t tmp;
- tmp = *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
+ tmp = *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
return tmp;
}
@@ -396,33 +396,33 @@ class APACHE_GEODE_EXPORT DataInput {
* as readonly and modification of contents using this internal pointer
* has undefined behavior.
*/
- inline const uint8_t* currentBufferPosition() const { return m_buf; }
+ inline const uint8_t* currentBufferPosition() const { return buffer_; }
/** get the number of bytes read in the buffer */
- inline size_t getBytesRead() const { return m_buf - m_bufHead; }
+ inline size_t getBytesRead() const { return buffer_ - bufferHead_; }
/** get the number of bytes remaining to be read in the buffer */
inline size_t getBytesRemaining() const {
- return (m_bufLength - getBytesRead());
+ return (bufferLength_ - getBytesRead());
}
/** advance the cursor by given offset */
- inline void advanceCursor(size_t offset) { m_buf += offset; }
+ inline void advanceCursor(size_t offset) { buffer_ += offset; }
/** rewind the cursor by given offset */
- inline void rewindCursor(size_t offset) { m_buf -= offset; }
+ inline void rewindCursor(size_t offset) { buffer_ -= offset; }
/** reset the cursor to the start of buffer */
- inline void reset() { m_buf = m_bufHead; }
+ inline void reset() { buffer_ = bufferHead_; }
inline void setBuffer() {
- m_buf = currentBufferPosition();
- m_bufLength = getBytesRemaining();
+ buffer_ = currentBufferPosition();
+ bufferLength_ = getBytesRemaining();
}
- inline void resetPdx(size_t offset) { m_buf = m_bufHead + offset; }
+ inline void resetPdx(size_t offset) { buffer_ = bufferHead_ + offset; }
- inline size_t getPdxBytes() const { return m_bufLength; }
+ inline size_t getPdxBytes() const { return bufferLength_; }
static uint8_t* getBufferCopy(const uint8_t* from, size_t length) {
uint8_t* result;
@@ -432,7 +432,7 @@ class APACHE_GEODE_EXPORT DataInput {
return result;
}
- inline void reset(size_t offset) { m_buf = m_bufHead + offset; }
+ inline void reset(size_t offset) { buffer_ = bufferHead_ + offset; }
uint8_t* getBufferCopyFrom(const uint8_t* from, size_t length) {
uint8_t* result;
@@ -452,6 +452,12 @@ class APACHE_GEODE_EXPORT DataInput {
DataInput& operator=(DataInput&&) = default;
protected:
+ const uint8_t* buffer_;
+ const uint8_t* bufferHead_;
+ size_t bufferLength_;
+ Pool* pool_;
+ const CacheImpl* cache_;
+
/** constructor given a pre-allocated byte array with size */
DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
Pool* pool);
@@ -459,12 +465,6 @@ class APACHE_GEODE_EXPORT DataInput {
virtual const SerializationRegistry& getSerializationRegistry() const;
private:
- const uint8_t* m_buf;
- const uint8_t* m_bufHead;
- size_t m_bufLength;
- Pool* m_pool;
- const CacheImpl* m_cache;
-
std::shared_ptr<Serializable> readObjectInternal(int8_t typeId = -1);
template <typename mType>
@@ -502,21 +502,21 @@ class APACHE_GEODE_EXPORT DataInput {
inline char readPdxChar() { return static_cast<char>(readInt16()); }
- inline void _checkBufferSize(size_t size, int32_t line) {
- if ((m_bufLength - (m_buf - m_bufHead)) < size) {
+ virtual void _checkBufferSize(size_t size, int32_t line) {
+ if ((bufferLength_ - (buffer_ - bufferHead_)) < size) {
throw OutOfRangeException(
"DataInput: attempt to read beyond buffer at line " +
std::to_string(line) + ": available buffer size " +
- std::to_string(m_bufLength - (m_buf - m_bufHead)) +
+ std::to_string(bufferLength_ - (buffer_ - bufferHead_)) +
", attempted read of size " + std::to_string(size));
}
}
- inline int8_t readNoCheck() { return *(m_buf++); }
+ inline int8_t readNoCheck() { return *(buffer_++); }
inline int16_t readInt16NoCheck() {
- int16_t tmp = *(m_buf++);
- tmp = static_cast<int16_t>((tmp << 8) | *(m_buf++));
+ int16_t tmp = *(buffer_++);
+ tmp = static_cast<int16_t>((tmp << 8) | *(buffer_++));
return tmp;
}
@@ -605,7 +605,7 @@ class APACHE_GEODE_EXPORT DataInput {
value.assign(reinterpret_cast<const wchar_t*>(tmp.data()), tmp.length());
}
- Pool* getPool() const { return m_pool; }
+ Pool* getPool() const { return pool_; }
friend Cache;
friend CacheImpl;
diff --git a/cppcache/src/Connector.hpp b/cppcache/src/Connector.hpp
index 98ce3d469..f13042b3c 100644
--- a/cppcache/src/Connector.hpp
+++ b/cppcache/src/Connector.hpp
@@ -116,6 +116,11 @@ class Connector {
*/
virtual uint16_t getPort() = 0;
+ /**
+ * Returns the remote endpoint for this connection in the form host:port
+ */
+ virtual std::string getRemoteEndpoint() = 0;
+
/**
* Writes an array of a known size to the underlying output stream.
*
diff --git a/cppcache/src/DataInput.cpp b/cppcache/src/DataInput.cpp
index 3590dd917..0a1a07061 100644
--- a/cppcache/src/DataInput.cpp
+++ b/cppcache/src/DataInput.cpp
@@ -30,21 +30,21 @@ namespace client {
DataInput::DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
Pool* pool)
- : m_buf(buffer),
- m_bufHead(buffer),
- m_bufLength(len),
- m_pool(pool),
- m_cache(cache) {}
+ : buffer_(buffer),
+ bufferHead_(buffer),
+ bufferLength_(len),
+ pool_(pool),
+ cache_(cache) {}
std::shared_ptr<Serializable> DataInput::readObjectInternal(int8_t typeId) {
return getSerializationRegistry().deserialize(*this, typeId);
}
const SerializationRegistry& DataInput::getSerializationRegistry() const {
- return *m_cache->getSerializationRegistry();
+ return *cache_->getSerializationRegistry();
}
-Cache* DataInput::getCache() const { return m_cache->getCache(); }
+Cache* DataInput::getCache() const { return cache_->getCache(); }
template <class _Traits, class _Allocator>
void DataInput::readJavaModifiedUtf8(
@@ -63,7 +63,7 @@ void DataInput::readJavaModifiedUtf8(
uint16_t length = readInt16();
_GEODE_CHECK_BUFFER_SIZE(length);
value = internal::JavaModifiedUtf8::decode(
- reinterpret_cast<const char*>(m_buf), length);
+ reinterpret_cast<const char*>(buffer_), length);
advanceCursor(length);
}
template APACHE_GEODE_EXPLICIT_TEMPLATE_EXPORT void
diff --git a/cppcache/src/GetAllServersResponse.cpp
b/cppcache/src/GetAllServersResponse.cpp
index 444763488..8a7597e23 100644
--- a/cppcache/src/GetAllServersResponse.cpp
+++ b/cppcache/src/GetAllServersResponse.cpp
@@ -21,10 +21,10 @@ namespace geode {
namespace client {
void GetAllServersResponse::toData(DataOutput& output) const {
- int32_t numServers = static_cast<int32_t>(m_servers.size());
- output.writeInt(numServers);
- for (int32_t i = 0; i < numServers; i++) {
- output.writeObject(m_servers.at(i));
+ auto numServers = servers_.size();
+ output.writeInt(static_cast<int32_t>(numServers));
+ for (unsigned int i = 0; i < numServers; i++) {
+ output.writeObject(servers_.at(i));
}
}
void GetAllServersResponse::fromData(DataInput& input) {
@@ -33,7 +33,7 @@ void GetAllServersResponse::fromData(DataInput& input) {
for (int i = 0; i < numServers; i++) {
std::shared_ptr<ServerLocation> sLoc = std::make_shared<ServerLocation>();
sLoc->fromData(input);
- m_servers.push_back(sLoc);
+ servers_.push_back(sLoc);
}
}
diff --git a/cppcache/src/GetAllServersResponse.hpp
b/cppcache/src/GetAllServersResponse.hpp
index 73bf83e12..c01071607 100644
--- a/cppcache/src/GetAllServersResponse.hpp
+++ b/cppcache/src/GetAllServersResponse.hpp
@@ -35,23 +35,27 @@ namespace client {
class GetAllServersResponse : public internal::DataSerializableFixedId_t<
internal::DSFid::GetAllServersResponse> {
- std::vector<std::shared_ptr<ServerLocation> > m_servers;
-
public:
static std::shared_ptr<Serializable> create() {
return std::make_shared<GetAllServersResponse>();
}
GetAllServersResponse() : Serializable() {}
+ explicit GetAllServersResponse(
+ std::vector<std::shared_ptr<ServerLocation> > servers)
+ : Serializable(), servers_(servers) {}
void toData(DataOutput& output) const override;
void fromData(DataInput& input) override;
size_t objectSize() const override {
- return sizeof(GetAllServersResponse) + m_servers.capacity();
+ return sizeof(GetAllServersResponse) + servers_.capacity();
}
std::vector<std::shared_ptr<ServerLocation> > getServers() {
- return m_servers;
+ return servers_;
}
~GetAllServersResponse() override = default;
+
+ private:
+ std::vector<std::shared_ptr<ServerLocation> > servers_;
};
} // namespace client
diff --git a/cppcache/src/StreamDataInput.cpp b/cppcache/src/StreamDataInput.cpp
new file mode 100644
index 000000000..3944689b1
--- /dev/null
+++ b/cppcache/src/StreamDataInput.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StreamDataInput.hpp"
+
+#include <geode/DataInput.hpp>
+
+#include "Utils.hpp"
+#include "util/Log.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+constexpr size_t kBufferSize = 3000;
+
+StreamDataInput::StreamDataInput(std::chrono::milliseconds timeout,
+ std::unique_ptr<Connector> connector,
+ const CacheImpl* cache, Pool* pool)
+ : DataInput(nullptr, 0, cache, pool),
+ connector_(std::move(connector)),
+ remainingTimeBeforeTimeout_(timeout) {}
+
+void StreamDataInput::readDataIfNotAvailable(size_t size) {
+ char buff[kBufferSize];
+ while (getBytesRemaining() < size) {
+ const auto start = std::chrono::system_clock::now();
+
+ const auto receivedLength = connector_->receive_nothrowiftimeout(
+ buff, kBufferSize, remainingTimeBeforeTimeout_);
+
+ const auto timeSpent = std::chrono::system_clock::now() - start;
+
+ remainingTimeBeforeTimeout_ -=
+ std::chrono::duration_cast<decltype(remainingTimeBeforeTimeout_)>(
+ timeSpent);
+
+ LOGDEBUG(
+ "received %d bytes from %s: %s, time spent: "
+ "%ld millisecs, time remaining before timeout: %ld millisecs",
+ receivedLength, connector_->getRemoteEndpoint().c_str(),
+ Utils::convertBytesToString(reinterpret_cast<uint8_t*>(buff),
+ receivedLength)
+ .c_str(),
+ std::chrono::duration_cast<std::chrono::milliseconds>(timeSpent)
+ .count(),
+ remainingTimeBeforeTimeout_.count());
+
+ if (remainingTimeBeforeTimeout_ <= std::chrono::milliseconds::zero()) {
+ throw(TimeoutException(std::string("Timeout when receiving from ")
+ .append(connector_->getRemoteEndpoint())));
+ }
+
+ auto newLength = bufferLength_ + receivedLength;
+ auto currentPosition = getBytesRead();
+ streamBuf_.resize(newLength);
+ memcpy(streamBuf_.data() + bufferLength_, buff, receivedLength);
+
+ bufferHead_ = streamBuf_.data();
+ buffer_ = bufferHead_ + currentPosition;
+ bufferLength_ = newLength;
+ }
+}
+
+} // namespace client
+} // namespace geode
+} // namespace apache
diff --git a/cppcache/src/GetAllServersResponse.cpp
b/cppcache/src/StreamDataInput.hpp
similarity index 50%
copy from cppcache/src/GetAllServersResponse.cpp
copy to cppcache/src/StreamDataInput.hpp
index 444763488..e9266970c 100644
--- a/cppcache/src/GetAllServersResponse.cpp
+++ b/cppcache/src/StreamDataInput.hpp
@@ -14,29 +14,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "GetAllServersResponse.hpp"
+
+#pragma once
+
+#ifndef GEODE_STREAMDATAINPUT_H_
+#define GEODE_STREAMDATAINPUT_H_
+
+#include <chrono>
+
+#include "Connector.hpp"
+#include "geode/DataInput.hpp"
namespace apache {
namespace geode {
namespace client {
-void GetAllServersResponse::toData(DataOutput& output) const {
- int32_t numServers = static_cast<int32_t>(m_servers.size());
- output.writeInt(numServers);
- for (int32_t i = 0; i < numServers; i++) {
- output.writeObject(m_servers.at(i));
- }
-}
-void GetAllServersResponse::fromData(DataInput& input) {
- int numServers = input.readInt32();
- LOGFINER("GetAllServersResponse::fromData length = %d ", numServers);
- for (int i = 0; i < numServers; i++) {
- std::shared_ptr<ServerLocation> sLoc = std::make_shared<ServerLocation>();
- sLoc->fromData(input);
- m_servers.push_back(sLoc);
+class Connector;
+
+/**
+ * Provides the same functionality as its parent class but
+ * data is retrieved, instead of from a passed buffer,
+ * from a socket connection.
+ */
+class APACHE_GEODE_EXPORT StreamDataInput : public DataInput {
+ public:
+ StreamDataInput(std::chrono::milliseconds timeout,
+ std::unique_ptr<Connector> connector, const CacheImpl* cache,
+ Pool* pool);
+
+ protected:
+ void _checkBufferSize(size_t size, int32_t /* line */) override {
+ readDataIfNotAvailable(size);
}
-}
+ void readDataIfNotAvailable(size_t size);
+
+ private:
+ std::unique_ptr<Connector> connector_;
+ std::chrono::milliseconds remainingTimeBeforeTimeout_;
+ std::vector<uint8_t> streamBuf_;
+};
} // namespace client
} // namespace geode
} // namespace apache
+
+#endif // GEODE_STREAMDATAINPUT_H_
diff --git a/cppcache/src/TcpConn.cpp b/cppcache/src/TcpConn.cpp
index e2798705e..1c0e916e5 100644
--- a/cppcache/src/TcpConn.cpp
+++ b/cppcache/src/TcpConn.cpp
@@ -17,7 +17,6 @@
#include "TcpConn.hpp"
-#include <iomanip>
#include <iostream>
#include <boost/optional.hpp>
@@ -293,6 +292,11 @@ size_t TcpConn::send(const char *buff, const size_t len,
// Return the local port for this TCP connection.
uint16_t TcpConn::getPort() { return socket_.local_endpoint().port(); }
+std::string TcpConn::getRemoteEndpoint() {
+ return socket_.remote_endpoint().address().to_string().append(":").append(
+ std::to_string(socket_.remote_endpoint().port()));
+}
+
void TcpConn::connect(boost::asio::ip::tcp::resolver::results_type r,
std::chrono::microseconds timeout) {
boost::optional<boost::system::error_code> connect_result;
diff --git a/cppcache/src/TcpConn.hpp b/cppcache/src/TcpConn.hpp
index 028214fa9..9cbec7bbc 100644
--- a/cppcache/src/TcpConn.hpp
+++ b/cppcache/src/TcpConn.hpp
@@ -38,6 +38,8 @@ class TcpConn : public Connector {
uint16_t getPort() override final;
+ std::string getRemoteEndpoint() override final;
+
protected:
boost::asio::io_context io_context_;
boost::asio::ip::tcp::socket socket_;
diff --git a/cppcache/src/ThinClientLocatorHelper.cpp
b/cppcache/src/ThinClientLocatorHelper.cpp
index d896d6561..4f6bb3963 100644
--- a/cppcache/src/ThinClientLocatorHelper.cpp
+++ b/cppcache/src/ThinClientLocatorHelper.cpp
@@ -22,8 +22,6 @@
#include <boost/thread/lock_types.hpp>
-#include <geode/DataInput.hpp>
-#include <geode/DataOutput.hpp>
#include <geode/SystemProperties.hpp>
#include "CacheImpl.hpp"
@@ -35,6 +33,7 @@
#include "LocatorListResponse.hpp"
#include "QueueConnectionRequest.hpp"
#include "QueueConnectionResponse.hpp"
+#include "StreamDataInput.hpp"
#include "TcpConn.hpp"
#include "TcpSslConn.hpp"
#include "TcrConnectionManager.hpp"
@@ -47,7 +46,6 @@ namespace apache {
namespace geode {
namespace client {
-const size_t BUFF_SIZE = 3000;
const size_t DEFAULT_CONNECTION_RETRIES = 3;
ThinClientLocatorHelper::ThinClientLocatorHelper(
@@ -143,20 +141,10 @@ std::shared_ptr<Serializable>
ThinClientLocatorHelper::sendRequest(
if (sentLength <= 0) {
return nullptr;
}
- char buff[BUFF_SIZE];
- const auto receivedLength = conn->receive(buff,
m_poolDM->getReadTimeout());
- if (!receivedLength) {
- return nullptr;
- }
-
- LOGDEBUG("%s(%p): received %d bytes from locator: %s", __GNFN__, this,
- receivedLength,
- Utils::convertBytesToString(reinterpret_cast<uint8_t*>(buff),
- receivedLength)
- .c_str());
- auto di = m_poolDM->getConnectionManager().getCacheImpl()->createDataInput(
- reinterpret_cast<uint8_t*>(buff), receivedLength);
+ StreamDataInput di(m_poolDM->getReadTimeout(), std::move(conn),
+ m_poolDM->getConnectionManager().getCacheImpl(),
+ nullptr);
if (di.read() == REPLY_SSL_ENABLED && !sys_prop.sslEnabled()) {
LOGERROR(
diff --git a/cppcache/test/CMakeLists.txt b/cppcache/test/CMakeLists.txt
index dd1577d17..501af1c2f 100644
--- a/cppcache/test/CMakeLists.txt
+++ b/cppcache/test/CMakeLists.txt
@@ -53,6 +53,7 @@ add_executable(apache-geode_unittests
QueueConnectionRequestTest.cpp
RegionAttributesFactoryTest.cpp
SerializableCreateTests.cpp
+ StreamDataInputTest.cpp
StringPrefixPartitionResolverTest.cpp
StructSetTest.cpp
TcrConnectionTest.cpp
diff --git a/cppcache/test/StreamDataInputTest.cpp
b/cppcache/test/StreamDataInputTest.cpp
new file mode 100644
index 000000000..c115e533c
--- /dev/null
+++ b/cppcache/test/StreamDataInputTest.cpp
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+
+#include <gtest/gtest.h>
+
+#include "CacheImpl.hpp"
+#include "Connector.hpp"
+#include "GetAllServersResponse.hpp"
+#include "ServerLocation.hpp"
+#include "StreamDataInput.hpp"
+#include "geode/DataOutput.hpp"
+#include "mock/ConnectorMock.hpp"
+
+namespace {
+
+using apache::geode::client::CacheImpl;
+using apache::geode::client::Connector;
+using apache::geode::client::ConnectorMock;
+using apache::geode::client::DataOutput;
+using apache::geode::client::GetAllServersResponse;
+using apache::geode::client::Serializable;
+using apache::geode::client::ServerLocation;
+using apache::geode::client::StreamDataInput;
+using apache::geode::client::TimeoutException;
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::Eq;
+using ::testing::Return;
+using ::testing::SetArrayArgument;
+using ::testing::SizeIs;
+
+constexpr size_t kReadBuffSize = 3000;
+constexpr size_t kStreamBufferSize = 10000;
+
+ACTION_P(WaitMs, milliseconds) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
+ return 0;
+}
+
+TEST(StreamDataInputTest, ObjectSizeGreaterThanReadBufferSize) {
+ std::unique_ptr<ConnectorMock> connector =
+ std::unique_ptr<ConnectorMock>(new ConnectorMock());
+
+ unsigned int numServers = 100;
+ std::vector<std::shared_ptr<ServerLocation> > servers(numServers);
+
+ for (unsigned int i = 0; i < numServers; i++) {
+ servers[i] = std::make_shared<ServerLocation>(
+ std::string("this.is.a.quite.long.hostname.and.the.reason.is.that.it."
+ "is.used.for.testing:") += std::to_string(2000 + i));
+ }
+
+ GetAllServersResponse getAllServersResponse(servers);
+
+ auto cache =
+ std::make_shared<CacheImpl>(nullptr, nullptr, false, false, nullptr);
+
+ auto dataOutput = cache->createDataOutput();
+
+ getAllServersResponse.toData(dataOutput);
+
+ auto buffer = dataOutput.getBuffer();
+ auto dataOutputBufferLength = dataOutput.getBufferLength();
+
+ // Gossip header
+ uint8_t streamBuffer[kStreamBufferSize];
+ streamBuffer[0] = 1;
+ streamBuffer[1] = 0xd6;
+ memcpy(streamBuffer + 2, buffer, dataOutputBufferLength);
+
+ auto streamBufferLength = dataOutputBufferLength + 2;
+
+ auto timeout = std::chrono::milliseconds(1000);
+ EXPECT_CALL(*connector, getRemoteEndpoint())
+ .WillRepeatedly(Return("locator:9999"));
+
+ EXPECT_CALL(*connector, receive_nothrowiftimeout(_, _, _))
+ .WillOnce(
+ DoAll(SetArrayArgument<0>(streamBuffer, streamBuffer +
kReadBuffSize),
+ Return(kReadBuffSize)))
+ .WillOnce(DoAll(SetArrayArgument<0>(streamBuffer + kReadBuffSize,
+ streamBuffer + 2 * kReadBuffSize),
+ Return(kReadBuffSize)))
+ .WillOnce(DoAll(SetArrayArgument<0>(streamBuffer + 2 * kReadBuffSize,
+ &streamBuffer[streamBufferLength]),
+ Return(streamBufferLength - (2 * kReadBuffSize))));
+
+ StreamDataInput streamDataInput(timeout, std::move(connector), cache.get(),
+ nullptr);
+
+ auto object = streamDataInput.readObject();
+
+ auto response = std::dynamic_pointer_cast<GetAllServersResponse>(object);
+
+ ASSERT_THAT(response->getServers(), SizeIs(servers.size()));
+ for (unsigned int i = 0; i < servers.size(); i++) {
+ ASSERT_THAT(response->getServers()[i]->getEpString(),
+ Eq(servers[i]->getEpString()));
+ }
+}
+
+TEST(StreamDataInputTest, TimeoutWhenReading) {
+ auto connector = std::unique_ptr<ConnectorMock>(new ConnectorMock());
+
+ auto cache =
+ std::make_shared<CacheImpl>(nullptr, nullptr, false, false, nullptr);
+
+ EXPECT_CALL(*connector, getRemoteEndpoint())
+ .WillRepeatedly(Return("locator:9999"));
+
+ EXPECT_CALL(*connector, receive_nothrowiftimeout(_, _, _))
+ .WillOnce(WaitMs(2));
+
+ auto timeout = std::chrono::milliseconds(1);
+ StreamDataInput streamDataInput(timeout, std::move(connector), cache.get(),
+ nullptr);
+
+ ASSERT_THROW(streamDataInput.readObject(), TimeoutException);
+}
+
+} // namespace
diff --git a/cppcache/src/GetAllServersResponse.cpp
b/cppcache/test/mock/ConnectorMock.hpp
similarity index 58%
copy from cppcache/src/GetAllServersResponse.cpp
copy to cppcache/test/mock/ConnectorMock.hpp
index 444763488..7525699ba 100644
--- a/cppcache/src/GetAllServersResponse.cpp
+++ b/cppcache/test/mock/ConnectorMock.hpp
@@ -14,29 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "GetAllServersResponse.hpp"
+
+#ifndef GEODE_CONNECTORMOCK_H_
+#define GEODE_CONNECTORMOCK_H_
+
+#include <gmock/gmock.h>
+
+#include "Connector.hpp"
namespace apache {
namespace geode {
namespace client {
-
-void GetAllServersResponse::toData(DataOutput& output) const {
- int32_t numServers = static_cast<int32_t>(m_servers.size());
- output.writeInt(numServers);
- for (int32_t i = 0; i < numServers; i++) {
- output.writeObject(m_servers.at(i));
- }
-}
-void GetAllServersResponse::fromData(DataInput& input) {
- int numServers = input.readInt32();
- LOGFINER("GetAllServersResponse::fromData length = %d ", numServers);
- for (int i = 0; i < numServers; i++) {
- std::shared_ptr<ServerLocation> sLoc = std::make_shared<ServerLocation>();
- sLoc->fromData(input);
- m_servers.push_back(sLoc);
+class ConnectorMock : public Connector {
+ public:
+ ConnectorMock(){
}
-}
+ MOCK_METHOD3(receive, size_t(char *b, size_t len,
+ std::chrono::milliseconds timeout));
+
+ MOCK_METHOD0(getPort, uint16_t());
+
+ MOCK_METHOD0(getRemoteEndpoint, std::string());
+
+ MOCK_METHOD3(send, size_t(const char *b, size_t len,
+ std::chrono::milliseconds timeout));
+
+ MOCK_METHOD3(receive_nothrowiftimeout, size_t(
+ char *b, size_t len, std::chrono::milliseconds timeout));
+
+
+};
} // namespace client
} // namespace geode
} // namespace apache
+
+#endif // GEODE_CONNECTORMOCK_H_