Author: veithm
Date: Thu Mar 20 09:05:30 2014
New Revision: 1579568
URL: http://svn.apache.org/r1579568
Log:
ETCH-278 Enable receiving of multiple packages in one buffer in EtchPackizer
Till now the packetizer was not able to handle more then one packet in a
buffer received from the transport layer.
This patches introduces a new algorithm to handle packets and adds tests
to ensure it works for all possible situations.
Change-Id: I7ba4b04a012ed8778861dc734ba2574d2b487ac9
Modified:
etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h
etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h
etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp
etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp
etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp
etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt
etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp
etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp
Modified: etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h (original)
+++ etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h Thu Mar
20 09:05:30 2014
@@ -18,6 +18,8 @@
#ifndef __ETCHFLEXBUFFER_H__
#define __ETCHFLEXBUFFER_H__
+
+#include "capu/util/SmartPointer.h"
#include "common/EtchConfig.h"
#include "common/EtchError.h"
#include "transport/EtchByteRepresentation.h"
@@ -90,7 +92,7 @@ public:
* ETCH_EINVAL if the byte representation is not BIG ENDIAN or
LITTLE ENDIAN
* ETCH_ERANGE if there is not enough value to be read (less than 4
byte)
*/
- status_t getInteger(capu::int32_t & value);
+ status_t getInteger(capu::uint32_t & value);
/**
* gets an byte value from flexible buffer
@@ -322,6 +324,7 @@ private:
void copy(capu::int8_t *buffer, capu::uint32_t numBytes);
};
+typedef capu::SmartPointer<EtchFlexBuffer> EtchFlexBufferPtr;
#endif /* ETCHFLEXBUFFER_H */
Modified: etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h (original)
+++ etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h Thu Mar
20 09:05:30 2014
@@ -51,8 +51,12 @@ public:
const static EtchString& MAX_PKT_SIZE_TERM();
const static capu::int32_t& SIG();
-
+
+ const static capu::uint32_t& SIG_SIZE();
+
const static capu::uint32_t& HEADER_SIZE();
+
+
/**
* Constructs the Packetizer with null packet handler and uri specified
@@ -72,6 +76,8 @@ public:
*/
EtchPacketizer(EtchRuntime* runtime, EtchTransportData* transport, EtchURL*
uri);
+
+public:
/**
* Destructor
*/
@@ -146,13 +152,16 @@ private:
capu::uint32_t mMaxPktSize;
- capu::uint32_t mBodyLen;
-
- capu::bool_t mWantHeader;
-
- capu::SmartPointer<EtchFlexBuffer> mSavedBuf;
-
- status_t processHeader(EtchFlexBuffer* buf, capu::bool_t reset,
capu::uint32_t &pktSize);
+ EtchFlexBufferPtr mReadBuf;
+ EtchFlexBufferPtr mTempBuf;
+
+
+ capu::bool_t bufferContainsPacket();
+ capu::bool_t bufferContainsHeader();
+ void bufferClean();
+ EtchFlexBufferPtr extractPacket();
+ void handleCorruptedBuffer();
+
};
Modified:
etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
---
etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp
(original)
+++
etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp
Thu Mar 20 09:05:30 2014
@@ -473,7 +473,7 @@ status_t EtchBinaryTaggedDataInput::read
case EtchTypeCode::INT:
{
- capu::int32_t tmp;
+ capu::uint32_t tmp;
if (mBuffer->getInteger(tmp) != ETCH_OK)
return ETCH_ERROR;
capu::SmartPointer<EtchInt32> intNum = new EtchInt32(tmp);
Modified: etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp
(original)
+++ etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp Thu
Mar 20 09:05:30 2014
@@ -56,20 +56,20 @@ capu::uint32_t EtchFlexBuffer::getIndex(
return mIndex;
}
-status_t EtchFlexBuffer::getInteger(capu::int32_t & value) {
+status_t EtchFlexBuffer::getInteger(capu::uint32_t & value) {
- if (sizeof (capu::int32_t) > getAvailableBytes())
+ if (sizeof (capu::uint32_t) > getAvailableBytes())
return ETCH_ERANGE;
if (mByteRepresentation == ETCH_LITTLE_ENDIAN) {
// little-endian
- capu::int32_t result = (mBuffer[mIndex++] & 255);
+ capu::uint32_t result = (mBuffer[mIndex++] & 255);
result += (mBuffer[mIndex++] & 255) << 8;
result += (mBuffer[mIndex++] & 255) << 16;
value = result + ((mBuffer[mIndex++] & 255) << 24);
} else if (mByteRepresentation == ETCH_BIG_ENDIAN) {
// big-endian
- capu::int32_t result = mBuffer[mIndex++] & 255;
+ capu::uint32_t result = mBuffer[mIndex++] & 255;
result = (result << 8) + (mBuffer[mIndex++] & 255);
result = (result << 8) + (mBuffer[mIndex++] & 255);
value = (result << 8) + (mBuffer[mIndex++] & 255);
@@ -136,7 +136,7 @@ status_t EtchFlexBuffer::getLong(capu::i
}
status_t EtchFlexBuffer::getFloat(capu::float_t & value) {
- capu::int32_t tmp;
+ capu::uint32_t tmp;
status_t err = getInteger(tmp);
if (err != ETCH_OK)
return err;
@@ -219,11 +219,7 @@ status_t EtchFlexBuffer::put(EtchFlexBuf
copy(&buffer.mBuffer[buffer.mIndex], buffer.getAvailableBytes());
- if (mIndex > mLength)
- mLength = mIndex;
-
return ETCH_OK;
-
}
status_t EtchFlexBuffer::put(EtchFlexBuffer & buffer, capu::uint32_t numBytes)
{
Modified: etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp
(original)
+++ etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp Thu
Mar 20 09:05:30 2014
@@ -29,6 +29,12 @@ const capu::int32_t& EtchPacketizer::SIG
return sig;
}
+const capu::uint32_t& EtchPacketizer::SIG_SIZE() {
+ static const capu::uint32_t sigSize(4);
+ return sigSize;
+}
+
+
const capu::int32_t& EtchPacketizer::DEFAULT_MAX_PKT_SIZE(){
static const capu::int32_t pktSize(16384 - EtchPacketizer::HEADER_SIZE());
return pktSize;
@@ -40,7 +46,7 @@ const EtchString& EtchPacketizer::MAX_PK
}
EtchPacketizer::EtchPacketizer(EtchRuntime* runtime, EtchTransportData*
transport, EtchString& uri)
-: mRuntime(runtime), mTransport(transport), mBodyLen(0), mWantHeader(true) {
+: mRuntime(runtime), mTransport(transport) {
if (mTransport != NULL)
mTransport->setSession(this);
@@ -55,11 +61,12 @@ EtchPacketizer::EtchPacketizer(EtchRunti
if (mMaxPktSize <= 0)
mMaxPktSize = DEFAULT_MAX_PKT_SIZE();
}
- mSavedBuf = new EtchFlexBuffer();
+ mReadBuf = new EtchFlexBuffer();
+ mTempBuf = new EtchFlexBuffer();
}
EtchPacketizer::EtchPacketizer(EtchRuntime* runtime, EtchTransportData*
transport, EtchURL* uri)
-: mRuntime(runtime), mTransport(transport), mBodyLen(0), mWantHeader(true) {
+: mRuntime(runtime), mTransport(transport) {
EtchString value("");
if (mTransport != NULL)
transport->setSession(this);
@@ -73,7 +80,8 @@ EtchPacketizer::EtchPacketizer(EtchRunti
if (mMaxPktSize <= 0)
mMaxPktSize = DEFAULT_MAX_PKT_SIZE();
}
- mSavedBuf = new EtchFlexBuffer();
+ mReadBuf = new EtchFlexBuffer();
+ mTempBuf = new EtchFlexBuffer();
}
EtchPacketizer::~EtchPacketizer() {
@@ -135,124 +143,118 @@ status_t EtchPacketizer::transportPacket
}
status_t EtchPacketizer::sessionData(capu::SmartPointer<EtchWho> sender,
capu::SmartPointer<EtchFlexBuffer> buf) {
-//TODO: compare with java version
-
+ status_t result = ETCH_OK;
+
+ if(mReadBuf->getAvailableBytes() > 0)
+ {
+ //If there is already data in the buffer, amend the new buffer by setting
the index to the end of the buffer
+ mReadBuf->setIndex(mReadBuf->getLength());
+ }
- // there are two options here. one is that we have no buffered data
- // and the entire packet is contained within the buf. in that case
- // we could optimize the daylights out of the process and directly
- // drop the packet on the handler.
-
- status_t result;
- if (buf->getAvailableBytes() > 0) {
- if (mWantHeader) {
- // do we have enough to make a header?
-
- if (mSavedBuf->getLength() + buf->getAvailableBytes() >= HEADER_SIZE()) {
- capu::uint32_t pktSize;
-
- if (mSavedBuf->getLength() == 0) {
- // savedBuf is empty, entire header in buf.
-
- result = processHeader(buf.get(), false, pktSize);
- if (result != ETCH_OK)
- return result;
- } else // header split across savedBuf and buf
- {
- // move just enough data from buf to savedBuf to have a header.
-
- capu::uint32_t needFromBuf = HEADER_SIZE() - mSavedBuf->getLength();
- mSavedBuf->put(*buf, needFromBuf);
- mSavedBuf->setIndex(0);
-
- result = processHeader(mSavedBuf.get(), true, pktSize);
- if (result != ETCH_OK)
- return result;
- }
-
- mBodyLen = pktSize;
- mWantHeader = false;
- } else // want header, but there's not enough to make it.
- {
- // save buf in savedBuf.
-
- mSavedBuf->setIndex(mSavedBuf->getLength());
- mSavedBuf->put(*buf);
- }
- }
- if (!mWantHeader && mSavedBuf->getLength() + buf->getAvailableBytes() >=
mBodyLen) {
- // want body, and there's enough to make it.
+ mReadBuf->put(*buf);
+ mReadBuf->setIndex(0);
- // three possible cases: the body is entirely in savedBuf,
- // the body is split, or the body is entirely in buf. assert
- // that the body cannot entirely be in savedBuf, or else
- // we'd have processed it last time.
-
- if (mSavedBuf->getLength() == 0) {
- // savedBuf is empty, entire body in buf.
-
- capu::uint32_t length = buf->getLength();
- capu::uint32_t index = buf->getIndex();
- buf->setLength(index + mBodyLen);
- ETCH_LOG_DEBUG(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "Entire body is in buffer: header
is parsed and the body of message is sent to Messagizer");
- result = mSession->sessionPacket(sender, buf);
-
- if (result != ETCH_OK) {
- return result;
- }
-
- buf->setLength(length);
- buf->setIndex(index + mBodyLen);
-
- mWantHeader = true;
- } else // body split across savedBuf and buf
- {
- // move just enough data from buf to savedBuf to have a body.
-
- capu::uint32_t needFromBuf = mBodyLen - mSavedBuf->getLength();
- mSavedBuf->put(*buf, needFromBuf);
- mSavedBuf->setIndex(0);
-
- ETCH_LOG_DEBUG(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "more data in buffer: header is
parsed and the body of message is sent to Messagizer");
- mSession->sessionPacket(sender, mSavedBuf);
-
- mSavedBuf->reset();
- mWantHeader = true;
- }
- } else if (!mWantHeader)// want body, but there's not enough to make it.
- {
- // save buf in savedBuf.
- mSavedBuf->setIndex(mSavedBuf->getLength());
- mSavedBuf->put(*buf);
- }
+ while(bufferContainsPacket())
+ {
+ EtchFlexBufferPtr packetData = extractPacket();
+ result = mSession->sessionPacket(sender, packetData);
}
- // buf is now empty, and there's nothing else to do.
- if (buf->getAvailableBytes() != 0)
- return ETCH_ERROR;
- return ETCH_OK;
+ return result;
}
-status_t EtchPacketizer::processHeader(EtchFlexBuffer* buf, capu::bool_t
reset, capu::uint32_t &pktSize) {
- capu::int32_t sig = 0;
- buf->getInteger(sig);
-
- if (sig != SIG()) {
- ETCH_LOG_ERROR(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "SIG is not correct, message will
be discarded");
- return ETCH_ERROR;
+capu::bool_t EtchPacketizer::bufferContainsPacket()
+{
+ if(!bufferContainsHeader())
+ {
+ return false;
}
+ capu::uint32_t packetSize = 0;
+ if(mReadBuf->getInteger(packetSize) != ETCH_OK ||
mReadBuf->getAvailableBytes() < packetSize)
+ {
+ mReadBuf->setIndex(0);
+ return false;
+ }
+ if (packetSize > mMaxPktSize) {
+ ETCH_LOG_ERROR(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: PacketSize
exceeded Maximum Packetsize. MaximumSize: " << mMaxPktSize << " PacketSize: "
<< packetSize);
+ mReadBuf->setIndex(mReadBuf->getIndex() + packetSize);
+ bufferClean();
+ }
+
+ mReadBuf->setIndex(0);
+ return true;
+}
- buf->getInteger((capu::int32_t&)pktSize);
+capu::bool_t EtchPacketizer::bufferContainsHeader()
+{
+ if(mReadBuf->getAvailableBytes() < HEADER_SIZE())
+ return false;
+
+ capu::uint32_t sig = 0;
+ mReadBuf->getInteger(sig);
+
+ if(sig == SIG()) {
+ return true;
+ }
- if (reset)
- buf->reset();
+ ETCH_LOG_ERROR(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: Packet SIG is
incorrect - discarding package and searching for next Valid SIG.");
+
+ capu::uint32_t length = mReadBuf->getLength() - sizeof(capu::int32_t);
+ capu::uint32_t index = mReadBuf->getIndex();
+
+ for(capu::uint32_t i = index; i < length; ++i) {
+ mReadBuf->getInteger(sig);
+ if(sig == SIG()) {
+ ETCH_LOG_DEBUG(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: Found correct
SIG in Buffer after having a damaged Buffer.");
+ mReadBuf->setIndex(i); //we found a correct Header
+ bufferClean();
+ mReadBuf->setIndex(SIG_SIZE());
+ return true;
+ }
+ mReadBuf->setIndex(i+1);
+ }
- if (pktSize > mMaxPktSize) {
- ETCH_LOG_ERROR(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "Packet size exceeds the maximum
packet size, message will be discarded");
- return ETCH_ERROR;
+ mReadBuf->setIndex(mReadBuf->getLength() - HEADER_SIZE()); //Save the last 8
Bytes
+ bufferClean();
+ return false;
+}
+
+void EtchPacketizer::bufferClean()
+{
+ mTempBuf->put(*mReadBuf);
+ mTempBuf->setIndex(0);
+
+ mReadBuf->reset();
+ mReadBuf->put(*mTempBuf);
+ mReadBuf->setIndex(0);
+
+ mTempBuf->reset();
+}
+
+EtchFlexBufferPtr EtchPacketizer::extractPacket()
+{
+ mReadBuf->setIndex(mReadBuf->getIndex() + 4);
+ capu::uint32_t packetSize = 0;
+ mReadBuf->getInteger(packetSize);
+
+ EtchFlexBufferPtr packetData = new EtchFlexBuffer();
+ packetData->put(*mReadBuf, packetSize);
+
+ mReadBuf->setIndex(mReadBuf->getIndex() + packetSize);
+ if(mReadBuf->getAvailableBytes() > 0)
+ {
+ mTempBuf->put(*mReadBuf);
+ mTempBuf->setIndex(0);
}
- return ETCH_OK;
+ mReadBuf->reset();
+ mReadBuf->put(*mTempBuf);
+ mReadBuf->setIndex(0);
+ mTempBuf->reset();
+ packetData->setIndex(0);
+ ETCH_LOG_DEBUG(mRuntime->getLogger(),
mRuntime->getLogger().getPacketizerContext(), "EtchPacketizer: extracted packet
- passing " << packetSize << " Bytes to Messagizer.")
+ return packetData;
}
+
EtchTransportData* EtchPacketizer::getTransport() {
return mTransport;
}
Modified: etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt (original)
+++ etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt Thu Mar 20 09:05:30
2014
@@ -110,6 +110,8 @@ ELSEIF(TARGET_OS STREQUAL "Windows")
target_link_libraries (etch-cpp-test etch-cpp capu)
ELSEIF(TARGET_OS STREQUAL "QNX")
target_link_libraries (etch-cpp-test etch-cpp capu c socket)
+ELSEIF(TARGET_OS STREQUAL "MacOSX")
+ target_link_libraries (etch-cpp-test etch-cpp capu)
ENDIF()
IF (TARGET_OS STREQUAL "Windows" AND BUILD_CHECK_MEMORY)
Modified:
etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp
(original)
+++ etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp
Thu Mar 20 09:05:30 2014
@@ -179,7 +179,7 @@ TEST(EtchFlexBufferTest, putAndgetIntege
EtchFlexBuffer *buffer = new EtchFlexBuffer();
capu::int8_t key [] = {'a', 'b', 'c'};
- capu::int32_t value = 5;
+ capu::uint32_t value = 5;
EXPECT_TRUE(buffer->put(NULL, 32) == ETCH_EINVAL);
//try to read an integer
EXPECT_TRUE(buffer->getInteger(value) == ETCH_ERANGE);
Modified:
etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp
URL:
http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp
(original)
+++ etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp
Thu Mar 20 09:05:30 2014
@@ -28,6 +28,9 @@
#include "transport/EtchSessionPacket.h"
#include "transport/EtchFlexBuffer.h"
+using testing::_;
+using testing::Invoke;
+
class MockListener3 : public virtual EtchSessionListener<EtchSocket> {
public:
@@ -145,26 +148,413 @@ TEST_F(EtchPacketizerTest, TransportPack
delete packetizer;
}
-TEST_F(EtchPacketizerTest, SessionDataTest) {
- EtchURL u("tcp://127.0.0.1:4001");
+class MessagizerCorruptedCheck : public EtchSessionPacket {
+public:
+
+ status_t sessionPacket(capu::SmartPointer<EtchWho> receipent,
capu::SmartPointer<EtchFlexBuffer> buf)
+ {
+ capu::uint32_t val;
+
+ buf->getInteger(val);
+ EXPECT_EQ(11, val);
+
+ buf->getInteger(val);
+ EXPECT_EQ(1122, val);
+
+ buf->getInteger(val);
+ EXPECT_EQ(112233, val);
+
+ buf->getInteger(val);
+ EXPECT_EQ(11223344, val);
+ return ETCH_OK;
+ }
+
+ status_t sessionQuery (capu::SmartPointer<EtchObject> query,
capu::SmartPointer<EtchObject> &result)
+ {
+ return ETCH_OK;
+ }
+
+ status_t sessionControl(capu::SmartPointer<EtchObject> control,
capu::SmartPointer<EtchObject> value)
+ {
+ return ETCH_OK;
+ }
+
+ status_t sessionNotify(capu::SmartPointer<EtchObject> event) {
+ return ETCH_OK;
+ }
+};
+
+class MockMessagizerCorrupted : public EtchSessionPacket {
+public:
+ MockMessagizerCorrupted() {
+ // By default, all calls are delegated to the real object.
+ ON_CALL(*this, sessionPacket(_, _)).WillByDefault(Invoke(&real_,
&MessagizerCorruptedCheck::sessionPacket));
+ }
+ MOCK_METHOD2(sessionPacket, status_t(capu::SmartPointer<EtchWho>
receipent, capu::SmartPointer<EtchFlexBuffer> buf));
+
+ MOCK_METHOD2(sessionQuery, status_t(capu::SmartPointer<EtchObject> query,
capu::SmartPointer<EtchObject> &result));
+
+ MOCK_METHOD2(sessionControl, status_t(capu::SmartPointer<EtchObject>
control, capu::SmartPointer<EtchObject> value));
+
+ status_t sessionNotify(capu::SmartPointer<EtchObject> event) {
+ return ETCH_OK;
+ }
+private:
+ MessagizerCorruptedCheck real_;
+};
+
+
+TEST_F(EtchPacketizerTest, SinglePackagesTest) {
+ EtchURL u("tcp://127.0.0.1:4001");
+ EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+ EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+ capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(1); //Expect two Clean packages
+
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 24);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ packetizer->setSession(NULL);
+ delete mSessionPacker;
+ delete packetizer;
+ delete conn;
+}
+
+TEST_F(EtchPacketizerTest, MutliplePackagesTest) {
+ EtchURL u("tcp://127.0.0.1:4001");
+ EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+ EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+ capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(3); //Expect two Clean packages
+
+ //Build up a Correct Package and Attach a damaged Header behind it, which
should lead to a bad SIG in the next loop
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 72);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ packetizer->setSession(NULL);
+ delete mSessionPacker;
+ delete packetizer;
+ delete conn;
+}
+
+TEST_F(EtchPacketizerTest, SplitPackagesTest) {
+ EtchURL u("tcp://127.0.0.1:4001");
+ EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+ EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+ capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(3); //Expect two Clean packages
+
+ //Build up a Correct Package and Attach a damaged Header behind it, which
should lead to a bad SIG in the next loop
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 36);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 36);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ packetizer->setSession(NULL);
+ delete mSessionPacker;
+ delete packetizer;
+ delete conn;
+}
+
+TEST_F(EtchPacketizerTest, SplitHeaderTest) {
+ EtchURL u("tcp://127.0.0.1:4001");
+ EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+ EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+ capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(3); //Expect two Clean packages
+
+ //Build up a Correct Package and Attach a damaged Header behind it, which
should lead to a bad SIG in the next loop
+ buffer->putInt(packetizer->SIG());
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 4);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 68);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ packetizer->setSession(NULL);
+ delete mSessionPacker;
+ delete packetizer;
+ delete conn;
+}
+
+TEST_F(EtchPacketizerTest, SplitMultiplePackagesTest) {
+ EtchURL u("tcp://127.0.0.1:4001");
+ EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+ EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+ capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(3); //Expect two Clean packages
+
+ //Build up a Correct Package and Attach a damaged Header behind it, which
should lead to a bad SIG in the next loop
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 36);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 8);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putInt(11223344);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 28);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ packetizer->setSession(NULL);
+ delete mSessionPacker;
+ delete packetizer;
+ delete conn;
+}
+
+
+
+TEST_F(EtchPacketizerTest, CorruptedPackagesTest_OnePacket) {
+ EtchURL u("tcp://127.0.0.1:4001");
+ EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+ EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+ capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(2); //Expect two Clean packages
+
+ //Build up a Correct Package and Attach a damaged Header behind it, which
should lead to a bad SIG in the next loop
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ //CorruptedPackage Header
+ buffer->putShort(0xBEEFu); //SIG = DEADBEEF
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 26);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putShort(0xDEADu); //Header SIG is now BEEFDEAD => wrong
+ buffer->put((capu::int8_t*)"123456789123456789123456789", 27); //some bad
Data
+ buffer->putInt(packetizer->SIG()); //a fine package behind the corrupted
one
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 53);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ packetizer->setSession(NULL);
+ delete mSessionPacker;
+ delete packetizer;
+ delete conn;
+}
+
+
+TEST_F(EtchPacketizerTest, CorruptedPackagesTest_MultiplePacket) {
+ EtchURL u("tcp://127.0.0.1:4001");
+ EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+ EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+ capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(2); //Expect two Clean packages
+
+ //Build up a Correct Package and Attach a damaged Header behind it, which
should lead to a bad SIG in the next loop
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ //CorruptedPackage Header
+ buffer->putShort(0xBEEFu); //SIG = DEADBEEF
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 26);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putShort(0xDEADu); //Header SIG is now BEEFDEAD => wrong
+ buffer->put((capu::int8_t*)"123456789123456789123456789", 27); //some bad
Data
+ buffer->putInt(0xBEEFDEAD);
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 33);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putShort(0xDEADu); //Header SIG is now DEADDEAD => wrong
+ buffer->put((capu::int8_t*)"123456789123456789123456789", 27); //some bad
Data
+ buffer->putInt(0xBEEFDEAD); //Half the Header "DEAD" = the beginning of
the correct Header
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 33);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ buffer->reset();
+ buffer->putShort(0xBEEFu); //Fine with previous data
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->setIndex(0);
+ EXPECT_TRUE(buffer->getLength() == 22);
+ EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
+ packetizer->setSession(NULL);
+ delete mSessionPacker;
+ delete packetizer;
+ delete conn;
+}
+
+
+TEST_F(EtchPacketizerTest, PacketToLargeTest) {
+ EtchURL u("tcp://127.0.0.1:4001?Packetizer.maxPktSize=18");
EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+
capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
- //A packet is created
- capu::int32_t pktsize = 4;
+ EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+ packetizer->setSession(mSessionPacker);
+
+ EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_,
_)).Times(2); //Expect two Clean packages
+
+ //Build up a Correct Package and Attach a damaged Header behind it, which
should lead to a bad SIG in the next loop
buffer->putInt(packetizer->SIG());
- buffer->putInt(pktsize);
- buffer->put((capu::int8_t *)"test", pktsize);
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ //CorruptedPackage Header
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(20); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
+ buffer->putInt(1122334455);
+ buffer->putInt(packetizer->SIG());
+ buffer->putInt(16); //size
+ buffer->putInt(11);
+ buffer->putInt(1122);
+ buffer->putInt(112233);
+ buffer->putInt(11223344);
buffer->setIndex(0);
- EXPECT_TRUE(buffer->getLength() == 12);
-
- EtchSessionPacket* mSessionPacker = new MockMessagizer();
- packetizer->setSession(mSessionPacker);
-
- MockMessagizer * mock = (MockMessagizer*) mSessionPacker;
- EXPECT_CALL(*mock, sessionPacket(capu::SmartPointer<EtchWho > (NULL),
buffer));
+ EXPECT_TRUE(buffer->getLength() == 76);
EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+
packetizer->setSession(NULL);
delete mSessionPacker;
delete packetizer;