This is an automated email from the ASF dual-hosted git repository.
mmartell pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push:
new dcb1e9e Gnmsg parse server handshake response (#939)
dcb1e9e is described below
commit dcb1e9e7d22b517e84c5a8bc44fe9d5d19a103c9
Author: Blake Bender <[email protected]>
AuthorDate: Mon Mar 7 07:55:03 2022 -0800
Gnmsg parse server handshake response (#939)
* Add timestamp, tid, direction, and Type to server handshake request
* Parsing server handshake response in gnmsg
- Added logging to TcrConnection to dump handshake response bytes
- Cleaned up some code in TcrConnection::initTcrConnection, to help
clarify what's going on
- Renamed TcrConnection member variables to conform to standard
---
cppcache/src/TcrConnection.cpp | 289 ++++++++++++++++--------------
cppcache/src/TcrConnection.hpp | 34 ++--
tools/gnmsg/client_message_decoder.py | 25 +--
tools/gnmsg/gnmsg.py | 18 +-
tools/gnmsg/handshake_acceptance_codes.py | 26 +++
tools/gnmsg/handshake_decoder.py | 121 +++++++++++--
tools/gnmsg/read_values.py | 11 +-
tools/gnmsg/server_message_decoder.py | 9 +-
8 files changed, 343 insertions(+), 190 deletions(-)
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 3fb09ba..d35c94d 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -26,6 +26,7 @@
#include "ClientProxyMembershipID.hpp"
#include "Connector.hpp"
#include "DistributedSystemImpl.hpp"
+#include "FunctionMacros.hpp"
#include "TcpConn.hpp"
#include "TcpSslConn.hpp"
#include "TcrConnectionManager.hpp"
@@ -35,6 +36,8 @@
#include "Utils.hpp"
#include "Version.hpp"
+INIT_GNFN("TcrConnection")
+
#define throwException(ex) \
do { \
LOGFINEST(ex.getName() + ": " + ex.getMessage()); \
@@ -86,16 +89,16 @@ const int64_t INITIAL_CONNECTION_ID = 26739;
struct FinalizeProcessChunk {
private:
- apache::geode::client::TcrMessage& m_reply;
- uint16_t m_endpointMemId;
+ apache::geode::client::TcrMessage& reply_;
+ uint16_t endpointMemId_;
public:
FinalizeProcessChunk(apache::geode::client::TcrMessageReply& reply,
uint16_t endpointMemId)
- : m_reply(reply), m_endpointMemId(endpointMemId) {}
+ : reply_(reply), endpointMemId_(endpointMemId) {}
~FinalizeProcessChunk() noexcept(false) {
// Enqueue a nullptr chunk indicating a wait for processing to complete.
- m_reply.processChunk(std::vector<uint8_t>(), 0, m_endpointMemId);
+ reply_.processChunk(std::vector<uint8_t>(), 0, endpointMemId_);
}
};
} // namespace
@@ -106,44 +109,44 @@ namespace client {
TcrConnection::TcrConnection(const TcrConnectionManager& connectionManager)
: connectionId(0),
- m_connectionManager(connectionManager),
- m_expiryTimeVariancePercentage{expiryTimeVariancePercentage()},
- m_hasServerQueue(NON_REDUNDANT_SERVER),
- m_queueSize(0),
- m_port(0),
+ connectionManager_(connectionManager),
+ expiryTimeVariancePercentage_{expiryTimeVariancePercentage()},
+ hasServerQueue_(NON_REDUNDANT_SERVER),
+ queueSize_(0),
+ port_(0),
chunks_process_semaphore_(0),
- m_isBeingUsed(false),
- m_isUsed(0),
- m_poolDM(nullptr) {}
+ isBeingUsed_(false),
+ isUsed_(0),
+ poolDM_(nullptr) {}
bool TcrConnection::initTcrConnection(
std::shared_ptr<TcrEndpoint> endpointObj,
synchronized_set<std::unordered_set<uint16_t>>& ports,
bool isClientNotification, bool isSecondary,
std::chrono::microseconds connectTimeout) {
- m_endpointObj = endpointObj;
- m_poolDM = dynamic_cast<ThinClientPoolDM*>(m_endpointObj->getPoolHADM());
- m_hasServerQueue = NON_REDUNDANT_SERVER;
- m_queueSize = 0;
- m_lastAccessed = m_creationTime = std::chrono::steady_clock::now();
+ endpointObj_ = endpointObj;
+ poolDM_ = dynamic_cast<ThinClientPoolDM*>(endpointObj_->getPoolHADM());
+ hasServerQueue_ = NON_REDUNDANT_SERVER;
+ queueSize_ = 0;
+ lastAccessed_ = creationTime_ = std::chrono::steady_clock::now();
connectionId = INITIAL_CONNECTION_ID;
- auto cacheImpl = m_poolDM->getConnectionManager().getCacheImpl();
+ auto cacheImpl = poolDM_->getConnectionManager().getCacheImpl();
const auto& distributedSystem = cacheImpl->getDistributedSystem();
const auto& sysProp = distributedSystem.getSystemProperties();
bool isPool = false;
- m_isBeingUsed = false;
+ isBeingUsed_ = false;
// Precondition:
// 1. isSecondary ==> isClientNotification
// Create TcpConn object which manages a socket connection with the endpoint.
if (endpointObj && endpointObj->getPoolHADM()) {
- createConnection(m_endpointObj->name().c_str(), connectTimeout,
+ createConnection(endpointObj_->name().c_str(), connectTimeout,
static_cast<int32_t>(
endpointObj->getPoolHADM()->getSocketBufferSize()));
isPool = true;
} else {
- createConnection(m_endpointObj->name().c_str(), connectTimeout,
+ createConnection(endpointObj_->name().c_str(), connectTimeout,
sysProp.maxSocketBufferSize());
}
@@ -169,8 +172,8 @@ bool TcrConnection::initTcrConnection(
// Send byte REPLY_OK = (byte)58;
if (!isClientNotification) {
- m_port = m_conn->getPort();
- ports.insert(m_port);
+ port_ = conn_->getPort();
+ ports.insert(port_);
} else {
auto&& lock = ports.make_lock();
handShakeMsg.writeInt(static_cast<int32_t>(ports.size()));
@@ -224,8 +227,8 @@ bool TcrConnection::initTcrConnection(
bool tmpIsSecurityOn = nullptr != cacheImpl->getAuthInitialize();
- if (m_endpointObj) {
- tmpIsSecurityOn = tmpIsSecurityOn || m_endpointObj->isMultiUserMode();
+ if (endpointObj_) {
+ tmpIsSecurityOn = tmpIsSecurityOn || endpointObj_->isMultiUserMode();
}
LOGDEBUG(
@@ -233,8 +236,8 @@ bool TcrConnection::initTcrConnection(
"%d ",
tmpIsSecurityOn, isNotificationChannel);
bool doIneedToSendCreds = true;
- if (isNotificationChannel && m_endpointObj &&
- this->m_endpointObj->isMultiUserMode()) {
+ if (isNotificationChannel && endpointObj_ &&
+ this->endpointObj_->isMultiUserMode()) {
tmpIsSecurityOn = false;
doIneedToSendCreds = false;
}
@@ -264,7 +267,7 @@ bool TcrConnection::initTcrConnection(
const auto& tmpAuthIniSecurityProperties =
authInitialize->getCredentials(tmpSecurityProperties,
- m_endpointObj->name().c_str());
+ endpointObj_->name().c_str());
LOGFINER("TcrConnection: after getCredentials ");
credentials = tmpAuthIniSecurityProperties;
}
@@ -295,62 +298,74 @@ bool TcrConnection::initTcrConnection(
endpointObj->name().c_str(),
isClientNotification ? (isSecondary ? "secondary " : "primary ") :
"",
isClientNotification ? "subscription" : "client");
- LOGDEBUG(std::string("Handshake bytes: (") + std::to_string(msgLength) +
- "): " + Utils::convertBytesToString(data, msgLength));
+ LOGDEBUG("%s(%p): Handshake bytes: (%d): %s", __GNFN__, this, msgLength,
+ Utils::convertBytesToString(data, msgLength).c_str());
+
ConnErrType error = sendData(data, msgLength, connectTimeout);
if (error == CONN_NOERR) {
+ std::vector<int8_t> recdBytes;
+
auto acceptanceCode = readHandshakeData(1, connectTimeout);
+ recdBytes.push_back(acceptanceCode[0]);
- LOGDEBUG(" Handshake: Got Accept Code %d", acceptanceCode[0]);
- /* adongre */
if (acceptanceCode[0] == REPLY_SSL_ENABLED && !sysProp.sslEnabled()) {
LOGERROR("SSL is enabled on server, enable SSL in client as well");
AuthenticationRequiredException ex(
"SSL is enabled on server, enable SSL in client as well");
- m_conn.reset();
+ conn_.reset();
throwException(ex);
}
auto serverQueueStatus = readHandshakeData(1, connectTimeout);
+ recdBytes.push_back(serverQueueStatus[0]);
// TESTING: Durable clients - set server queue status.
// 0 - Non-Redundant , 1- Redundant , 2- Primary
if (serverQueueStatus[0] == 1) {
- m_hasServerQueue = REDUNDANT_SERVER;
+ hasServerQueue_ = REDUNDANT_SERVER;
} else if (serverQueueStatus[0] == 2) {
- m_hasServerQueue = PRIMARY_SERVER;
+ hasServerQueue_ = PRIMARY_SERVER;
} else {
- m_hasServerQueue = NON_REDUNDANT_SERVER;
+ hasServerQueue_ = NON_REDUNDANT_SERVER;
}
auto queueSizeMsg = readHandshakeData(4, connectTimeout);
- auto dataInput = cacheImpl->createDataInput(
- reinterpret_cast<const uint8_t*>(queueSizeMsg.data()),
- queueSizeMsg.size());
- int32_t queueSize = 0;
- queueSize = dataInput.readInt32();
- m_queueSize = queueSize > 0 ? queueSize : 0;
+ recdBytes.insert(std::end(recdBytes), std::begin(queueSizeMsg),
+ std::end(queueSizeMsg));
+ queueSize_ = static_cast<int32_t>(queueSizeMsg[0]) << 24 |
+ static_cast<int32_t>(queueSizeMsg[1]) << 16 |
+ static_cast<int32_t>(queueSizeMsg[2]) << 8 |
+ static_cast<int32_t>(queueSizeMsg[3]);
+ queueSize_ = queueSize_ > 0 ? queueSize_ : 0;
- m_endpointObj->setServerQueueStatus(m_hasServerQueue, m_queueSize);
+ endpointObj_->setServerQueueStatus(hasServerQueue_, queueSize_);
- ////////////////////////// Set Pool Specific Q Size only when
+ ////////////////////////// Set Pool Specific Q Size when
///////////////////////////////////
- ////////////////////////// 1. SereverQStatus = Primary or
+ ////////////////////////// 1. ServerQStatus is Primary or
///////////////////////////////////
- ////////////////////////// 2. SereverQStatus = Non-Redundant and
+ ////////////////////////// 2. ServerQStatus is Non-Redundant but
///////////////////////////////////
- ////////////////////////// 3. Only when handshake is for subscription
+ ////////////////////////// 3. ONLY when handshake is for subscription
///////////////////////////////////
- if ((m_hasServerQueue == PRIMARY_SERVER ||
- m_hasServerQueue == NON_REDUNDANT_SERVER) &&
+ if ((hasServerQueue_ == PRIMARY_SERVER ||
+ hasServerQueue_ == NON_REDUNDANT_SERVER) &&
isClientNotification) {
- m_poolDM->setPrimaryServerQueueSize(queueSize);
+ poolDM_->setPrimaryServerQueueSize(queueSize_);
}
if (!isClientNotification) {
// Read the DistributedMember object
auto recvMsgLen = readHandshakeArraySize(connectTimeout);
+ recdBytes.push_back((recvMsgLen & 0xff000000) >> 24);
+ recdBytes.push_back((recvMsgLen & 0x00ff0000) >> 16);
+ recdBytes.push_back((recvMsgLen & 0x0000ff00) >> 8);
+ recdBytes.push_back(recvMsgLen & 0x000000ff);
+
auto recvMessage = readHandshakeData(recvMsgLen, connectTimeout);
+ recdBytes.insert(std::end(recdBytes), std::begin(recvMessage),
+ std::end(recvMessage));
+
// If the distributed member has not been set yet, set it.
if (getEndpointObject()->getDistributedMemberID() == 0) {
LOGDEBUG("Deserializing distributed member Id");
@@ -366,75 +381,88 @@ bool TcrConnection::initTcrConnection(
}
auto recvMsgLenBytes = readHandshakeData(2, connectTimeout);
- auto dataInput3 = m_connectionManager.getCacheImpl()->createDataInput(
- reinterpret_cast<const uint8_t*>(recvMsgLenBytes.data()),
- recvMsgLenBytes.size());
- uint16_t recvMsgLen2 = dataInput3.readInt16();
+ uint16_t recvMsgLen2 = static_cast<int16_t>(recvMsgLenBytes[0]) << 8 |
+ static_cast<int16_t>(recvMsgLenBytes[1]);
+ recdBytes.insert(std::end(recdBytes), std::begin(recvMsgLenBytes),
+ std::end(recvMsgLenBytes));
+
auto recvMessage = readHandshakeData(recvMsgLen2, connectTimeout);
+ recdBytes.insert(std::end(recdBytes), std::begin(recvMessage),
+ std::end(recvMessage));
if (!isClientNotification) {
auto deltaEnabledMsg = readHandshakeData(1, connectTimeout);
- auto di = m_connectionManager.getCacheImpl()->createDataInput(
- reinterpret_cast<const uint8_t*>(deltaEnabledMsg.data()), 1);
- ThinClientBaseDM::setDeltaEnabledOnServer(di.readBoolean());
+ recdBytes.push_back(deltaEnabledMsg[0]);
+ ThinClientBaseDM::setDeltaEnabledOnServer(deltaEnabledMsg[0] ? true
+ : false);
}
+ LOGDEBUG(
+ "%s(%p): isClientNotification=%s, Handshake response bytes: (%d) %s",
+ __GNFN__, this, isClientNotification ? "true" : "false",
+ recdBytes.size(),
+ Utils::convertBytesToString(recdBytes.data(), recdBytes.size())
+ .c_str());
+
switch (acceptanceCode[0]) {
case REPLY_OK:
case SUCCESSFUL_SERVER_TO_CLIENT:
LOGFINER("Handshake reply: %u,%u,%u", acceptanceCode[0],
serverQueueStatus[0], recvMsgLen2);
- if (isClientNotification) readHandshakeInstantiatorMsg(connectTimeout);
+ if (isClientNotification) {
+ readHandshakeInstantiatorMsg(connectTimeout);
+ }
+
break;
case REPLY_AUTHENTICATION_FAILED: {
AuthenticationFailedException ex(
reinterpret_cast<char*>(recvMessage.data()));
- m_conn.reset();
+ conn_.reset();
throwException(ex);
}
case REPLY_AUTHENTICATION_REQUIRED: {
AuthenticationRequiredException ex(
reinterpret_cast<char*>(recvMessage.data()));
- m_conn.reset();
+ conn_.reset();
throwException(ex);
}
case REPLY_DUPLICATE_DURABLE_CLIENT: {
DuplicateDurableClientException ex(
reinterpret_cast<char*>(recvMessage.data()));
- m_conn.reset();
+ conn_.reset();
throwException(ex);
}
case REPLY_REFUSED:
case REPLY_INVALID:
case UNSUCCESSFUL_SERVER_TO_CLIENT: {
LOGERROR("Handshake rejected by server[%s]: %s",
- m_endpointObj->name().c_str(),
+ endpointObj_->name().c_str(),
reinterpret_cast<char*>(recvMessage.data()));
auto message = std::string("TcrConnection::TcrConnection: ") +
"Handshake rejected by server: " +
reinterpret_cast<char*>(recvMessage.data());
CacheServerException ex(message);
- m_conn.reset();
+ conn_.reset();
throw ex;
}
default: {
LOGERROR(
"Unknown error[%d] received from server [%s] in handshake: "
"%s",
- acceptanceCode[0], m_endpointObj->name().c_str(),
+ acceptanceCode[0], endpointObj_->name().c_str(),
recvMessage.data());
auto message =
std::string("TcrConnection::TcrConnection: Unknown error") +
" received from server in handshake: " +
reinterpret_cast<char*>(recvMessage.data());
MessageException ex(message);
- m_conn.reset();
+ conn_.reset();
throw ex;
}
}
} else {
- m_conn.reset();
+ conn_.reset();
if (error & CONN_TIMEOUT) {
throw TimeoutException(
"TcrConnection::TcrConnection: "
@@ -452,9 +480,9 @@ bool TcrConnection::initTcrConnection(
//---if pool in not in multiuser node
//---or old endpoint case.
- if (this->m_endpointObj && !isNotificationChannel && tmpIsSecurityOn &&
- (!isPool || !this->m_endpointObj->isMultiUserMode())) {
- // this->m_endpointObj->authenticateEndpoint(this);
+ if (this->endpointObj_ && !isNotificationChannel && tmpIsSecurityOn &&
+ (!isPool || !this->endpointObj_->isMultiUserMode())) {
+ // this->endpointObj_->authenticateEndpoint(this);
return true;
}
@@ -465,26 +493,26 @@ void TcrConnection::createConnection(const std::string&
address,
std::chrono::microseconds connectTimeout,
int32_t maxBuffSizePool) {
Connector* socket = nullptr;
- auto& systemProperties = m_connectionManager.getCacheImpl()
+ auto& systemProperties = connectionManager_.getCacheImpl()
->getDistributedSystem()
.getSystemProperties();
if (systemProperties.sslEnabled()) {
- const auto& sniHostname = m_poolDM->getSniProxyHost();
+ const auto& sniHostname = poolDM_->getSniProxyHost();
if (sniHostname.empty()) {
- m_conn.reset(new TcpSslConn(address, connectTimeout, maxBuffSizePool,
- systemProperties.sslTrustStore(),
- systemProperties.sslKeyStore(),
- systemProperties.sslKeystorePassword()));
+ conn_.reset(new TcpSslConn(address, connectTimeout, maxBuffSizePool,
+ systemProperties.sslTrustStore(),
+ systemProperties.sslKeyStore(),
+ systemProperties.sslKeystorePassword()));
} else {
- const auto sniPort = m_poolDM->getSniProxyPort();
- m_conn.reset(new TcpSslConn(
+ const auto sniPort = poolDM_->getSniProxyPort();
+ conn_.reset(new TcpSslConn(
address, connectTimeout, maxBuffSizePool, sniHostname, sniPort,
systemProperties.sslTrustStore(), systemProperties.sslKeyStore(),
systemProperties.sslKeystorePassword()));
}
} else {
- m_conn.reset(new TcpConn(address, connectTimeout, maxBuffSizePool));
+ conn_.reset(new TcpConn(address, connectTimeout, maxBuffSizePool));
}
}
@@ -492,11 +520,11 @@ ConnErrType TcrConnection::receiveData(
char* buffer, const size_t length,
const std::chrono::microseconds timeout) {
try {
- const auto readBytes = m_conn->receive(
+ const auto readBytes = conn_->receive(
buffer, length,
std::chrono::duration_cast<std::chrono::milliseconds>(timeout));
- m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes));
+ poolDM_->getStats().incReceivedBytes(static_cast<int64_t>(readBytes));
} catch (boost::system::system_error& ex) {
switch (ex.code().value()) {
case boost::asio::error::eof:
@@ -515,9 +543,8 @@ ConnErrType TcrConnection::receiveData(
ConnErrType TcrConnection::sendData(const char* buffer, size_t length,
std::chrono::microseconds timeout) {
try {
- m_conn->send(
- buffer, length,
- std::chrono::duration_cast<std::chrono::milliseconds>(timeout));
+ conn_->send(buffer, length,
+
std::chrono::duration_cast<std::chrono::milliseconds>(timeout));
} catch (boost::system::system_error& ex) {
switch (ex.code().value()) {
case boost::asio::error::operation_aborted:
@@ -611,7 +638,7 @@ void TcrConnection::send(const char* buffer, size_t len,
std::chrono::microseconds sendTimeoutSec, bool) {
LOGDEBUG(
"TcrConnection::send: [%p] sending request to endpoint %s; bytes: %s",
- this, m_endpointObj->name().c_str(),
+ this, endpointObj_->name().c_str(),
Utils::convertBytesToString(buffer, len).c_str());
switch (sendData(buffer, len, sendTimeoutSec)) {
@@ -679,10 +706,10 @@ char* TcrConnection::readMessage(size_t* recvLen,
LOGDEBUG(
"TcrConnection::readMessage(%p): received header from endpoint %s; "
"bytes: %s",
- this, m_endpointObj->name().c_str(),
+ this, endpointObj_->name().c_str(),
Utils::convertBytesToString(msg_header, HEADER_LENGTH).c_str());
- auto input = m_connectionManager.getCacheImpl()->createDataInput(
+ auto input = connectionManager_.getCacheImpl()->createDataInput(
reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH);
// ignore msgType
input.readInt32();
@@ -733,7 +760,7 @@ char* TcrConnection::readMessage(size_t* recvLen,
LOGDEBUG(
"TcrConnection::readMessage: received message body from "
"endpoint %s; bytes: %s",
- m_endpointObj->name().c_str(),
+ endpointObj_->name().c_str(),
Utils::convertBytesToString(fullMessage + HEADER_LENGTH,
msgLen).c_str());
return fullMessage;
@@ -748,7 +775,7 @@ void TcrConnection::readMessageChunked(TcrMessageReply&
reply,
LOGFINER(
"TcrConnection::readMessageChunked: receiving reply from "
"endpoint %s",
- m_endpointObj->name().c_str());
+ endpointObj_->name().c_str());
auto responseHeader = readResponseHeader(headerTimeout);
@@ -761,7 +788,7 @@ void TcrConnection::readMessageChunked(TcrMessageReply&
reply,
// indicate an end to chunk processing and wait for processing
// to end even if reading the chunks fails in middle
FinalizeProcessChunk endProcessChunk(reply,
-
m_endpointObj->getDistributedMemberID());
+ endpointObj_->getDistributedMemberID());
auto header = responseHeader.header;
try {
@@ -782,7 +809,7 @@ void TcrConnection::readMessageChunked(TcrMessageReply&
reply,
LOGFINER(
"TcrConnection::readMessageChunked: read full reply "
"from endpoint %s",
- m_endpointObj->name().c_str());
+ endpointObj_->name().c_str());
}
std::chrono::microseconds TcrConnection::calculateHeaderTimeout(
@@ -816,10 +843,10 @@ chunkedResponseHeader TcrConnection::readResponseHeader(
LOGDEBUG(
"TcrConnection::readResponseHeader(%p): received header from "
"endpoint %s; bytes: %s",
- this, m_endpointObj->name().c_str(),
+ this, endpointObj_->name().c_str(),
Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str());
- auto input = m_connectionManager.getCacheImpl()->createDataInput(
+ auto input = connectionManager_.getCacheImpl()->createDataInput(
receiveBuffer, HEADER_LENGTH);
header.messageType = input.readInt32();
header.numberOfParts = input.readInt32();
@@ -858,10 +885,10 @@ chunkHeader
TcrConnection::readChunkHeader(std::chrono::microseconds timeout) {
LOGDEBUG(
"TcrConnection::readChunkHeader: received header from "
"endpoint %s; bytes: %s",
- m_endpointObj->name().c_str(),
+ endpointObj_->name().c_str(),
Utils::convertBytesToString(receiveBuffer, CHUNK_HEADER_LENGTH).c_str());
- auto input = m_connectionManager.getCacheImpl()->createDataInput(
+ auto input = connectionManager_.getCacheImpl()->createDataInput(
receiveBuffer, CHUNK_HEADER_LENGTH);
header.chunkLength = input.readInt32();
header.flags = input.read();
@@ -893,7 +920,7 @@ std::vector<uint8_t> TcrConnection::readChunkBody(
LOGDEBUG(
"TcrConnection::readChunkBody(%p): received chunk body from endpoint "
"%s; bytes: %s",
- this, m_endpointObj->name().c_str(),
+ this, endpointObj_->name().c_str(),
Utils::convertBytesToString(chunkBody.data(), chunkLength).c_str());
return chunkBody;
}
@@ -907,9 +934,9 @@ bool TcrConnection::processChunk(TcrMessageReply& reply,
std::vector<uint8_t> chunkBody = readChunkBody(timeout, chunkLength);
// Process the chunk; the actual processing is done by a separate thread
- // ThinClientBaseDM::m_chunkProcessor.
+ // ThinClientBaseDM::chunkProcessor_.
reply.processChunk(chunkBody, chunkLength,
- m_endpointObj->getDistributedMemberID(),
+ endpointObj_->getDistributedMemberID(),
lastChunkAndSecurityFlags);
// Return boolean indicating whether or not there are more chunks, i.e.
// the *inverse* of the flag indicating this is the last chunk. It's a
@@ -918,15 +945,15 @@ bool TcrConnection::processChunk(TcrMessageReply& reply,
}
void TcrConnection::close() {
- auto cache = m_poolDM->getConnectionManager().getCacheImpl();
+ auto cache = poolDM_->getConnectionManager().getCacheImpl();
TcrMessageCloseConnection closeMsg{
std::unique_ptr<DataOutput>(
- new DataOutput(cache->createDataOutput(m_poolDM))),
- cache->isKeepAlive() || m_poolDM->isKeepAlive()};
+ new DataOutput(cache->createDataOutput(poolDM_))),
+ cache->isKeepAlive() || poolDM_->isKeepAlive()};
try {
if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH &&
- !m_connectionManager.isNetDown()) {
+ !connectionManager_.isNetDown()) {
send(closeMsg.getMsgData(), closeMsg.getMsgLength(),
std::chrono::seconds(2), false);
}
@@ -940,17 +967,15 @@ void TcrConnection::close() {
std::vector<int8_t> TcrConnection::readHandshakeData(
int32_t msgLength, std::chrono::microseconds connectTimeout) {
ConnErrType error = CONN_NOERR;
- if (msgLength < 0) {
- msgLength = 0;
- }
- std::vector<int8_t> message(msgLength + 1);
- message.data()[msgLength] = '\0';
- if (msgLength == 0) {
- return message;
+ if (msgLength <= 0) {
+ return std::vector<int8_t>();
}
+
+ std::vector<int8_t> message(msgLength);
+
if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength,
connectTimeout)) != CONN_NOERR) {
- m_conn.reset();
+ conn_.reset();
if (error & CONN_TIMEOUT) {
throwException(
TimeoutException("TcrConnection::TcrConnection: "
@@ -973,13 +998,13 @@ int32_t TcrConnection::readHandshakeArraySize(
int32_t arrayLength = static_cast<uint8_t>(arrayLenHeader[0]);
if (static_cast<int8_t>(arrayLenHeader[0]) == -2) {
auto arrayLengthBytes = readHandshakeData(2, connectTimeout);
- auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput(
+ auto dataInput2 = connectionManager_.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()),
arrayLengthBytes.size());
arrayLength = dataInput2.readInt16();
} else if (static_cast<int8_t>(arrayLenHeader[0]) == -3) {
auto arrayLengthBytes = readHandshakeData(4, connectTimeout);
- auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput(
+ auto dataInput2 = connectionManager_.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()),
arrayLengthBytes.size());
arrayLength = dataInput2.readInt32();
@@ -1027,13 +1052,13 @@ void TcrConnection::readHandShakeBytes(
connectTimeout)) != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
_GEODE_SAFE_DELETE_ARRAY(recvMessage);
- m_conn.reset();
+ conn_.reset();
throwException(
TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake"));
} else {
_GEODE_SAFE_DELETE_ARRAY(recvMessage);
- m_conn.reset();
+ conn_.reset();
throwException(
GeodeIOException("TcrConnection::TcrConnection: "
"Handshake failure"));
@@ -1049,7 +1074,7 @@ std::shared_ptr<CacheableString>
TcrConnection::readHandshakeString(
char cstypeid;
if (receiveData(&cstypeid, 1, connectTimeout) != CONN_NOERR) {
- m_conn.reset();
+ conn_.reset();
if (error & CONN_TIMEOUT) {
LOGFINE("Timeout receiving string typeid");
throwException(
@@ -1072,7 +1097,7 @@ std::shared_ptr<CacheableString>
TcrConnection::readHandshakeString(
}
case DSCode::CacheableASCIIString: {
auto lenBytes = readHandshakeData(2, connectTimeout);
- auto lenDI = m_connectionManager.getCacheImpl()->createDataInput(
+ auto lenDI = connectionManager_.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(lenBytes.data()), lenBytes.size());
length = lenDI.readInt16();
@@ -1127,7 +1152,7 @@ std::shared_ptr<CacheableString>
TcrConnection::readHandshakeString(
case DSCode::CacheableUserData4:
case DSCode::PDX:
case DSCode::PDX_ENUM: {
- m_conn.reset();
+ conn_.reset();
throwException(
GeodeIOException("TcrConnection::TcrConnection: "
"Handshake failure: Unexpected string type ID"));
@@ -1146,13 +1171,13 @@ std::shared_ptr<CacheableString>
TcrConnection::readHandshakeString(
if ((error = receiveData(recvMessage.data(), length, connectTimeout)) !=
CONN_NOERR) {
if (error & CONN_TIMEOUT) {
- m_conn.reset();
+ conn_.reset();
LOGFINE("Timeout receiving string data");
throwException(
TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake reading string bytes"));
} else {
- m_conn.reset();
+ conn_.reset();
LOGFINE("IO error receiving string data");
throwException(
GeodeIOException("TcrConnection::TcrConnection: "
@@ -1171,8 +1196,8 @@ bool TcrConnection::hasExpired(const
std::chrono::milliseconds& expiryTime) {
return false;
}
auto variadicExpiryTime =
- expiryTime + (expiryTime * m_expiryTimeVariancePercentage) / 100;
- return (std::chrono::steady_clock::now() - m_creationTime) >
+ expiryTime + (expiryTime * expiryTimeVariancePercentage_) / 100;
+ return (std::chrono::steady_clock::now() - creationTime_) >
variadicExpiryTime;
}
@@ -1181,15 +1206,15 @@ bool TcrConnection::isIdle(const
std::chrono::milliseconds& idleTime) {
return false;
}
- return (std::chrono::steady_clock::now() - m_lastAccessed) > idleTime;
+ return (std::chrono::steady_clock::now() - lastAccessed_) > idleTime;
}
void TcrConnection::touch() {
- m_lastAccessed = std::chrono::steady_clock::now();
+ lastAccessed_ = std::chrono::steady_clock::now();
}
std::chrono::steady_clock::time_point TcrConnection::getLastAccessed() {
- return m_lastAccessed;
+ return lastAccessed_;
}
uint8_t TcrConnection::getOverrides(const SystemProperties* props) {
@@ -1206,7 +1231,7 @@ uint8_t TcrConnection::getOverrides(const
SystemProperties* props) {
}
void TcrConnection::updateCreationTime() {
- m_creationTime = std::chrono::steady_clock::now();
+ creationTime_ = std::chrono::steady_clock::now();
touch();
}
@@ -1218,28 +1243,28 @@ bool TcrConnection::setAndGetBeingUsed(volatile bool
isBeingUsed,
if (!forTransaction) {
if (isBeingUsed) {
- if (m_isUsed == 1 || m_isUsed == 2) return false;
- if (m_isUsed.compare_exchange_strong(currentValue, 1)) return true;
+ if (isUsed_ == 1 || isUsed_ == 2) return false;
+ if (isUsed_.compare_exchange_strong(currentValue, 1)) return true;
return false;
} else {
- m_isUsed = 0;
+ isUsed_ = 0;
return true;
}
} else {
if (isBeingUsed) {
- if (m_isUsed == 1) { // already used
+ if (isUsed_ == 1) { // already used
return false;
}
- if (m_isUsed == 2) { // transaction thread has set, reused it
+ if (isUsed_ == 2) { // transaction thread has set, reused it
return true;
}
- if (m_isUsed.compare_exchange_strong(currentValue,
- 2 /*for transaction*/)) {
+ if (isUsed_.compare_exchange_strong(currentValue,
+ 2 /*for transaction*/)) {
return true;
}
return false;
} else {
- // m_isUsed = 0;//this will done by releasing the connection by
+ // isUsed_ = 0;//this will done by releasing the connection by
// transaction at the end of transaction
return true;
}
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index fb7562d..bc26bc4 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -238,13 +238,13 @@ class TcrConnection {
// Durable clients: return true if server has HA queue.
ServerQueueStatus inline getServerQueueStatus(int32_t& queueSize) {
- queueSize = m_queueSize;
- return m_hasServerQueue;
+ queueSize = queueSize_;
+ return hasServerQueue_;
}
- uint16_t inline getPort() { return m_port; }
+ uint16_t inline getPort() { return port_; }
- TcrEndpoint* getEndpointObject() const { return m_endpointObj.get(); }
+ TcrEndpoint* getEndpointObject() const { return endpointObj_.get(); }
bool setAndGetBeingUsed(volatile bool isBeingUsed, bool forTransaction);
@@ -266,13 +266,13 @@ class TcrConnection {
}
const TcrConnectionManager& getConnectionManager() {
- return m_connectionManager;
+ return connectionManager_;
}
private:
int64_t connectionId;
- const TcrConnectionManager& m_connectionManager;
- int m_expiryTimeVariancePercentage = 0;
+ const TcrConnectionManager& connectionManager_;
+ int expiryTimeVariancePercentage_ = 0;
std::chrono::microseconds calculateHeaderTimeout(
std::chrono::microseconds receiveTimeout, bool retry);
@@ -341,24 +341,24 @@ class TcrConnection {
ConnErrType receiveData(char* buffer, size_t length,
std::chrono::microseconds receiveTimeoutSec);
- std::shared_ptr<TcrEndpoint> m_endpointObj;
- std::unique_ptr<Connector> m_conn;
- ServerQueueStatus m_hasServerQueue;
- int32_t m_queueSize;
- uint16_t m_port;
+ std::shared_ptr<TcrEndpoint> endpointObj_;
+ std::unique_ptr<Connector> conn_;
+ ServerQueueStatus hasServerQueue_;
+ int32_t queueSize_;
+ uint16_t port_;
// semaphore to synchronize with the chunked response processing thread
binary_semaphore chunks_process_semaphore_;
- std::chrono::steady_clock::time_point m_creationTime;
- std::chrono::steady_clock::time_point m_lastAccessed;
+ std::chrono::steady_clock::time_point creationTime_;
+ std::chrono::steady_clock::time_point lastAccessed_;
// Disallow copy constructor and assignment operator.
TcrConnection(const TcrConnection&);
TcrConnection& operator=(const TcrConnection&);
- volatile bool m_isBeingUsed;
- std::atomic<uint32_t> m_isUsed;
- ThinClientPoolDM* m_poolDM;
+ volatile bool isBeingUsed_;
+ std::atomic<uint32_t> isUsed_;
+ ThinClientPoolDM* poolDM_;
std::chrono::microseconds sendWithTimeouts(
const char* data, size_t len, std::chrono::microseconds sendTimeout,
std::chrono::microseconds receiveTimeout);
diff --git a/tools/gnmsg/client_message_decoder.py
b/tools/gnmsg/client_message_decoder.py
index 20e76f0..5e49566 100644
--- a/tools/gnmsg/client_message_decoder.py
+++ b/tools/gnmsg/client_message_decoder.py
@@ -17,6 +17,7 @@ import re
import struct
import sys
+from collections import OrderedDict
from dateutil import parser
from client_messages import parse_client_message
@@ -113,18 +114,18 @@ class ClientMessageDecoder(DecoderBase):
parts.append(parser.parse(match.group(1)))
parts.append(match.group(2))
parts.append(match.group(3))
- parts.append(match.group(4))
- parts.append(match.group(5))
+ parts.append(match.group(4))
+ parts.append(match.group(5))
result = True
else:
- match = self.send_trace_expression_base_.search(line)
- if match:
- parts.append(parser.parse(match.group(1)))
- parts.append(match.group(2))
- parts.append("")
- parts.append(match.group(3))
- parts.append(match.group(4))
- result = True
+ match = self.send_trace_expression_base_.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append(match.group(2))
+ parts.append("")
+ parts.append(match.group(3))
+ parts.append(match.group(4))
+ result = True
return result
@@ -217,7 +218,7 @@ class ClientMessageDecoder(DecoderBase):
connection = None
is_send_trace = False
is_add_security_trace = False
- send_trace = {}
+ send_trace = OrderedDict()
if not self.is_candidate_line(line):
return
@@ -232,7 +233,7 @@ class ClientMessageDecoder(DecoderBase):
message_bytes,
) = parts
if send_trace["ThreadName"] == "":
- del(send_trace["ThreadName"])
+ del send_trace["ThreadName"]
is_send_trace = True
elif self.get_add_security_trace_parts(line, parts):
timestamp, tid, connection, security_footer_length, message_bytes
= parts
diff --git a/tools/gnmsg/gnmsg.py b/tools/gnmsg/gnmsg.py
index d2fd9ad..933cf06 100755
--- a/tools/gnmsg/gnmsg.py
+++ b/tools/gnmsg/gnmsg.py
@@ -47,17 +47,17 @@ def scan_opened_file(
try:
data = output_queue.get_nowait()
for key, value in data.items():
- if key == "message" and dump_messages:
- if thread_id:
- if "tid" in value.keys() and value["tid"] == thread_id:
- print(separator + json.dumps(value, indent=2,
default=str))
- separator = ","
- else:
+ if (
+ "tid" not in value.keys()
+ or ("tid" in value.keys() and value["tid"] == thread_id)
+ or thread_id is None
+ ):
+ if key == "message" and dump_messages:
+ print(separator + json.dumps(value, indent=2,
default=str))
+ separator = ","
+ elif key == "handshake" and dump_handshake:
print(separator + json.dumps(value, indent=2,
default=str))
separator = ","
- elif key == "handshake" and dump_handshake:
- print(separator + json.dumps(value, indent=2, default=str))
- separator = ","
except queue.Empty:
continue
diff --git a/tools/gnmsg/handshake_acceptance_codes.py
b/tools/gnmsg/handshake_acceptance_codes.py
new file mode 100644
index 0000000..1289664
--- /dev/null
+++ b/tools/gnmsg/handshake_acceptance_codes.py
@@ -0,0 +1,26 @@
+#!/usr/local/bin/python3
+
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server_handshake_reply_codes = {
+ 21: "REPLY_SSL_ENABLED",
+ 59: "REPLY_OK",
+ 60: "REPLY_REFUSED",
+ 61: "REPLY_INVALID",
+ 62: "REPLY_AUTHENTICATION_REQUIRED",
+ 63: "REPLY_AUTHENTICATION_FAILED",
+ 64: "REPLY_DUPLICATE_DURABLE_CLIENT",
+ 105: "SUCCESSFUL_SERVER_TO_CLIENT",
+ 106: "UNSUCCESSFUL_SERVER_TO_CLIENT",
+}
diff --git a/tools/gnmsg/handshake_decoder.py b/tools/gnmsg/handshake_decoder.py
index 669bf0c..331c121 100644
--- a/tools/gnmsg/handshake_decoder.py
+++ b/tools/gnmsg/handshake_decoder.py
@@ -15,6 +15,7 @@
# limitations under the License.
import re
+from collections import OrderedDict
from dateutil import parser
from decoder_base import DecoderBase
@@ -22,15 +23,16 @@ from ds_codes import ds_codes
from ds_fids import ds_fids
from modified_utf8 import utf8m_to_utf8s
from connection_types import ConnectionTypes, ConnectionTypeStrings
+from handshake_acceptance_codes import server_handshake_reply_codes
from read_values import (
call_reader_function,
read_boolean_value,
read_byte_array,
+ read_byte_array_with_length,
read_byte_value,
read_cacheable_ascii_string_value,
read_fixed_id_byte_value,
- read_geode_jmutf8_string_value,
read_int_value,
read_short_value,
read_cacheable_string_value,
@@ -52,17 +54,20 @@ class HandshakeDecoder(DecoderBase):
3: "SECURITY_MULTIUSER_NOTIFICATIONCHANNEL",
}
self.client_connection_request_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*([\d|a-f|A-F|x|X]+)
.*\]\s*ThinClientLocatorHelper::sendRequest\([0-9|a-f|A-F|x|X]+\): sending \d+
bytes to locator:\s*([0-9|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).* ([\d]+)
.*\]\s*ThinClientLocatorHelper::sendRequest\([0-9|a-f|A-F|x|X]+\): sending \d+
bytes to locator:\s*([0-9|a-f|A-F]+)"
)
self.client_connection_response_expression_ = re.compile(
- r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).*([\d|a-f|A-F|x|X]+)
.*\]\s*ThinClientLocatorHelper::sendRequest\([0-9|a-f|A-F|x|X]+\): received \d+
bytes from locator:\s*([0-9|a-f|A-F]+)"
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).* ([\d]+)
.*\]\s*ThinClientLocatorHelper::sendRequest\([0-9|a-f|A-F|x|X]+\): received \d+
bytes from locator:\s*([0-9|a-f|A-F]+)"
)
- self.server_handshake_expression_ = expression = re.compile(
- r"Handshake bytes: \(\d+\):\s*([0-9|a-f|A-F]+)"
+ self.server_handshake_request_expression_ = expression = re.compile(
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).* ([\d]+)
.*\]\s*Handshake bytes: \(\d+\):\s*([0-9|a-f|A-F]+)"
+ )
+ self.server_handshake_response_expression_ = expression = re.compile(
+ r"(\d\d\d\d\/\d\d\/\d\d \d\d:\d\d:\d\d\.\d+).* ([\d]+)
.*\].*isClientNotification=([t|r|u|e|f|a|l|s]+),\s+Handshake response bytes:
\(\d+\)\s*([0-9|a-f|A-F]+)"
)
def is_candidate_line(self, line):
- return "Helper::sendR" in line or "ake bytes:" in line
+ return "Helper::sendR" in line or "ake bytes:" in line or "onse
bytes:" in line
def is_locator_request(self, line):
match = self.client_connection_request_expression_.search(line)
@@ -100,21 +105,39 @@ class HandshakeDecoder(DecoderBase):
return result
- def is_server_handshake_trace(self, line):
- match = self.server_handshake_expression_.search(line)
+ def is_server_handshake_request(self, line):
+ match = self.server_handshake_request_expression_.search(line)
if match:
return True
else:
return False
- def get_server_handshake_bytes(self, line):
- match = self.server_handshake_expression_.search(line)
+ def get_server_handshake_request_parts(self, line, parts):
+ match = self.server_handshake_request_expression_.search(line)
if match:
- return match.group(1)
+ parts.append(parser.parse(match.group(1)))
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ else:
+ exit(1)
+
+ def is_server_handshake_response(self, line):
+ match = self.server_handshake_response_expression_.search(line)
+ if match:
+ return True
+ else:
+ return False
+
+ def get_server_handshake_response_parts(self, line, parts):
+ match = self.server_handshake_response_expression_.search(line)
+ if match:
+ parts.append(parser.parse(match.group(1)))
+ parts.append(match.group(2))
+ parts.append(match.group(3))
+ parts.append(match.group(4))
else:
exit(1)
- # TODO: Find a handshake that uses a list and implement this function to
parse it
def read_list_of_ports(self, string, offset):
(number_of_ports, offset) = call_reader_function(string, offset,
read_int_value)
ports = []
@@ -247,7 +270,14 @@ class HandshakeDecoder(DecoderBase):
return (self.credentials_types[credential_type], offset)
def get_server_handshake_info(self, line, handshake_info):
- handshake_bytes = self.get_server_handshake_bytes(line)
+ parts = []
+ self.get_server_handshake_request_parts(line, parts)
+ handshake_info["Timestamp"] = parts[0]
+ handshake_info["tid"] = parts[1]
+ handshake_info["Direction"] = "--->"
+ handshake_info["Type"] = "ServerHandshakeRequest"
+ handshake_bytes = parts[2]
+
(connection_type, offset) = call_reader_function(
handshake_bytes, 0, read_byte_value
)
@@ -578,17 +608,78 @@ class HandshakeDecoder(DecoderBase):
pass
# TODO: decode other response types
+ def decode_server_handshake_response(self, line, handshake_response):
+ parts = []
+ self.get_server_handshake_response_parts(line, parts)
+
+ handshake_response["Timestamp"] = parts[0]
+ handshake_response["tid"] = parts[1]
+ handshake_response["Direction"] = "<---"
+ handshake_response["Type"] = "ServerHandshakeResponse"
+
+ is_client_notification = True if parts[2] == "true" else False
+ handshake_response["IsClientNotification"] =
str(is_client_notification)
+
+ response_bytes = parts[3]
+ offset = 0
+ (acceptance_code, offset) = call_reader_function(
+ response_bytes, offset, read_byte_value
+ )
+ handshake_response["AcceptanceCode"] = server_handshake_reply_codes[
+ acceptance_code
+ ]
+
+ (server_queue_status, offset) = call_reader_function(
+ response_bytes, offset, read_byte_value
+ )
+ if server_queue_status == 1:
+ handshake_response["ServerQueueStatus"] = "REDUNDANT_SERVER"
+ elif server_queue_status == 2:
+ handshake_response["ServerQueueStatus"] = "PRIMARY_SERVER"
+ else:
+ handshake_response["ServerQueueStatus"] = "NON_REDUNDANT_SERVER"
+
+ (handshake_response["QueueSize"], offset) = call_reader_function(
+ response_bytes, offset, read_int_value
+ )
+
+ if not is_client_notification:
+ (receive_message_length, offset) = call_reader_function(
+ response_bytes, offset, read_int_value
+ )
+ (receive_message, offset) = read_byte_array_with_length(
+ response_bytes, offset, receive_message_length
+ )
+ handshake_response["ReceiveMessage"] = receive_message
+
+ (receive_message_length, offset) = call_reader_function(
+ response_bytes, offset, read_short_value
+ )
+ (receive_message, offset) = read_byte_array_with_length(
+ response_bytes, offset, receive_message_length
+ )
+ handshake_response["ReceiveMessage2"] = receive_message
+
+ if not is_client_notification:
+ (delta_enabled, offset) = call_reader_function(
+ response_bytes, offset, read_byte_value
+ )
+ handshake_response["DeltaEnabled"] = "True" if delta_enabled else
"False"
+
def process_line(self, line):
if not self.is_candidate_line(line):
return
- handshake = {}
+ handshake = OrderedDict()
if self.is_locator_request(line):
self.decode_locator_request(line, handshake)
self.output_queue_.put({"handshake": handshake})
elif self.is_locator_response(line):
self.decode_locator_response(line, handshake)
self.output_queue_.put({"handshake": handshake})
- elif self.is_server_handshake_trace(line):
+ elif self.is_server_handshake_request(line):
self.get_server_handshake_info(line, handshake)
self.output_queue_.put({"handshake": handshake})
+ elif self.is_server_handshake_response(line):
+ self.decode_server_handshake_response(line, handshake)
+ self.output_queue_.put({"handshake": handshake})
diff --git a/tools/gnmsg/read_values.py b/tools/gnmsg/read_values.py
index fc8f4ea..e2e9d9a 100644
--- a/tools/gnmsg/read_values.py
+++ b/tools/gnmsg/read_values.py
@@ -79,6 +79,15 @@ def read_byte_array(string, offset):
return byte_string, offset + (array_length * 2)
+def read_byte_array_with_length(string, offset, array_length):
+ byte_string = ""
+ for i in range(offset, offset + (array_length * 2), 2):
+ byte_string += string[i : i + 2]
+ byte_string += " "
+ byte_string = byte_string[:-1]
+ return byte_string, offset + (array_length * 2)
+
+
def read_boolean_value(message_bytes, offset):
(bool_val, offset) = call_reader_function(message_bytes, offset,
read_byte_value)
bool_string = "True" if bool_val == 1 else "False"
@@ -153,7 +162,7 @@ def read_geode_jmutf8_string_value(buffer, offset,
string_length):
string = []
bad_length = IndexError("Insufficient length for JM utf-8 string")
- while cursor < string_length:
+ while cursor < offset + (string_length * 2):
code_point, cursor = call_reader_function(buffer, cursor,
read_byte_value)
if code_point == 0:
raise TypeError("Should not encounter a 0 byte in JM utf-8")
diff --git a/tools/gnmsg/server_message_decoder.py
b/tools/gnmsg/server_message_decoder.py
index ca4ce47..38b92e6 100644
--- a/tools/gnmsg/server_message_decoder.py
+++ b/tools/gnmsg/server_message_decoder.py
@@ -18,6 +18,7 @@ import re
import struct
import sys
+from collections import OrderedDict
from dateutil import parser
from server_messages import parse_server_message
@@ -357,7 +358,9 @@ class ServerMessageDecoder(DecoderBase):
pass
elif self.get_receive_trace_parts(line, parts):
tid = parts[1]
- last_header = {"Timestamp": parts[0], "tid": tid, "Connection":
parts[2]}
+ last_header = OrderedDict[
+ "Timestamp" : parts[0], "tid":tid, "Connection" : parts[2]
+ ]
message_bytes = parts[3]
self.headers_[tid] = last_header
if (
@@ -383,9 +386,7 @@ class ServerMessageDecoder(DecoderBase):
self.chunk_decoders_[tid].add_header(parts[2], parts[4])
if self.chunk_decoders_[tid].is_complete_message():
- receive_trace = self.chunk_decoders_[tid].get_decoded_message(
- parts[0]
- )
+ receive_trace =
self.chunk_decoders_[tid].get_decoded_message(parts[0])
receive_trace["tid"] = str(tid)
self.output_queue_.put({"message": receive_trace})
self.chunk_decoders_[tid].reset()