Author: aconway
Date: Mon Jan 14 19:08:53 2013
New Revision: 1433055
URL: http://svn.apache.org/viewvc?rev=1433055&view=rev
Log:
QPID-4514: Clean up cluster code: SessionImpl
Clean up obsolete code in SessionImpl class used only by defunct cluster code:
- Remove SessionImpl::send reframe parameter.
- Remove doClearDeliveryPropertiesExchange flag.
- Remove disableAutoDetach and autoDetach flag.
Modified:
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=1433055&r1=1433054&r2=1433055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Jan 14 19:08:53 2013
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,9 +61,7 @@ SessionImpl::SessionImpl(const std::stri
ioHandler(*this),
proxy(ioHandler),
nextIn(0),
- nextOut(0),
- doClearDeliveryPropertiesExchange(true),
- autoDetach(true)
+ nextOut(0)
{
channel.next = connection.get();
}
@@ -72,12 +70,10 @@ SessionImpl::~SessionImpl() {
{
Lock l(state);
if (state != DETACHED && state != DETACHING) {
- if (autoDetach) {
- QPID_LOG(warning, "Session was not closed cleanly: " << id);
- // Inform broker but don't wait for detached as that deadlocks.
- // The detached will be ignored as the channel will be invalid.
- try { detach(); } catch (...) {} // ignore errors.
- }
+ QPID_LOG(warning, "Session was not closed cleanly: " << id);
+ // Inform broker but don't wait for detached as that deadlocks.
+ // The detached will be ignored as the channel will be invalid.
+ try { detach(); } catch (...) {} // ignore errors.
setState(DETACHED);
handleClosed();
state.waitWaiters();
@@ -136,10 +132,10 @@ void SessionImpl::resume(boost::shared_p
void SessionImpl::suspend() //user thread
{
Lock l(state);
- detach();
+ detach();
}
-void SessionImpl::detach() //call with lock held
+void SessionImpl::detach() //call with lock held
{
if (state == ATTACHED) {
setState(DETACHING);
@@ -149,8 +145,8 @@ void SessionImpl::detach() //call with l
uint16_t SessionImpl::getChannel() const // user thread
-{
- return channel;
+{
+ return channel;
}
void SessionImpl::setChannel(uint16_t c) // user thread
@@ -182,7 +178,7 @@ void SessionImpl::waitForCompletionImpl(
bool SessionImpl::isComplete(const SequenceNumber& id)
{
- Lock l(state);
+ Lock l(state);
return !incompleteOut.contains(id);
}
@@ -219,7 +215,7 @@ framing::SequenceNumber SessionImpl::get
return --firstIncomplete;
}
-struct MarkCompleted
+struct MarkCompleted
{
const SequenceNumber& id;
SequenceSet& completedIn;
@@ -230,7 +226,7 @@ struct MarkCompleted
{
if (id >= end) {
completedIn.add(start, end);
- } else if (id >= start) {
+ } else if (id >= start) {
completedIn.add(start, id);
}
}
@@ -244,13 +240,13 @@ void SessionImpl::markCompleted(const Se
completedIn.add(ids);
if (notifyPeer) {
sendCompletion();
- }
+ }
}
void SessionImpl::markCompleted(const SequenceNumber& id, bool cumulative,
bool notifyPeer)
{
Lock l(state);
- if (cumulative) {
+ if (cumulative) {
//everything in incompleteIn less than or equal to id is now complete
MarkCompleted f(id, completedIn);
incompleteIn.for_each(f);
@@ -260,11 +256,11 @@ void SessionImpl::markCompleted(const Se
incompleteIn.remove(completedIn);
} else if (incompleteIn.contains(id)) {
incompleteIn.remove(id);
- completedIn.add(id);
+ completedIn.add(id);
}
if (notifyPeer) {
sendCompletion();
- }
+ }
}
void SessionImpl::setException(const sys::ExceptionHolder& ex) {
@@ -310,42 +306,24 @@ namespace {
struct SendContentFn {
FrameHandler& handler;
void operator()(const AMQFrame& f) {
- if (!f.getMethod())
+ if (!f.getMethod())
handler(const_cast<AMQFrame&>(f));
}
SendContentFn(FrameHandler& h) : handler(h) {}
};
-// Adaptor to make FrameSet look like MethodContent; used in cluster update
client
-struct MethodContentAdaptor : MethodContent
-{
- AMQHeaderBody header;
- const std::string content;
-
- MethodContentAdaptor(const FrameSet& f) : header(*f.getHeaders()),
content(f.getContent()) {}
-
- const AMQHeaderBody& getHeader() const
- {
- return header;
- }
- const std::string& getData() const
- {
- return content;
- }
-};
-
}
-
-Future SessionImpl::send(const AMQBody& command, const FrameSet& content, bool
reframe) {
+
+Future SessionImpl::send(const AMQBody& command, const FrameSet& content) {
Acquire a(sendLock);
SequenceNumber id = nextOut++;
{
Lock l(state);
- checkOpen();
+ checkOpen();
incompleteOut.add(id);
}
Future f(id);
- if (command.getMethod()->resultExpected()) {
+ if (command.getMethod()->resultExpected()) {
Lock l(state);
//result listener must be set before the command is sent
f.setFutureResult(results.listenForResult(id));
@@ -353,14 +331,8 @@ Future SessionImpl::send(const AMQBody&
AMQFrame frame(command);
frame.setEof(false);
handleOut(frame);
-
- if (reframe) {
- MethodContentAdaptor c(content);
- sendContent(c);
- } else {
- SendContentFn send(out);
- content.map(send);
- }
+ SendContentFn send(out);
+ content.map(send);
return f;
}
@@ -375,11 +347,11 @@ Future SessionImpl::sendCommand(const AM
SequenceNumber id = nextOut++;
{
Lock l(state);
- checkOpen();
+ checkOpen();
incompleteOut.add(id);
}
Future f(id);
- if (command.getMethod()->resultExpected()) {
+ if (command.getMethod()->resultExpected()) {
Lock l(state);
//result listener must be set before the command is sent
f.setFutureResult(results.listenForResult(id));
@@ -399,23 +371,13 @@ void SessionImpl::sendContent(const Meth
{
AMQFrame header(content.getHeader());
- // doClearDeliveryPropertiesExchange is set by cluster update client so
- // it can send messages with delivery-properties.exchange set.
- //
- if (doClearDeliveryPropertiesExchange) {
- // Normal client is not allowed to set the delivery-properties.exchange
- // so clear it here.
- AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody());
- if (headerp && headerp->get<DeliveryProperties>())
- headerp->get<DeliveryProperties>(true)->clearExchangeFlag();
- }
header.setFirstSegment(false);
uint64_t data_length = content.getData().length();
if(data_length > 0){
header.setLastSegment(false);
- handleOut(header);
+ handleOut(header);
/*Note: end of frame marker included in overhead but not in size*/
- const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
+ const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
if(data_length < frag_size){
AMQFrame frame((AMQContentBody(content.getData())));
@@ -442,7 +404,7 @@ void SessionImpl::sendContent(const Meth
}
}
} else {
- handleOut(header);
+ handleOut(header);
}
}
@@ -462,7 +424,7 @@ bool isContentFrame(AMQFrame& frame)
{
AMQBody* body = frame.getBody();
uint8_t type = body->type();
- return type == HEADER_BODY || type == CONTENT_BODY ||
isMessageMethod(body);
+ return type == HEADER_BODY || type == CONTENT_BODY ||
isMessageMethod(body);
}
void SessionImpl::handleIn(AMQFrame& frame) // network thread
@@ -585,7 +547,7 @@ void SessionImpl::timeout(uint32_t t)
void SessionImpl::commandPoint(const framing::SequenceNumber& id, uint64_t
offset)
{
if (offset) throw NotImplementedException("Non-zero byte offset not yet
supported for command-point");
-
+
Lock l(state);
nextIn = id;
}
@@ -677,10 +639,10 @@ void SessionImpl::exception(uint16_t err
{
Lock l(state);
setExceptionLH(createSessionException(errorCode, description));
- QPID_LOG(warning, "Exception received from broker: " <<
exceptionHolder.what()
+ QPID_LOG(warning, "Exception received from broker: " <<
exceptionHolder.what()
<< " [caused by " << commandId << " " << classCode << ":" <<
commandCode << "]");
- if (detachedLifetime)
+ if (detachedLifetime)
setTimeout(0);
}
@@ -748,6 +710,4 @@ boost::shared_ptr<ConnectionImpl> Sessio
return connection;
}
-void SessionImpl::disableAutoDetach() { autoDetach = false; }
-
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=1433055&r1=1433054&r2=1433055&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Jan 14 19:08:53 2013
@@ -87,15 +87,7 @@ public:
Future send(const framing::AMQBody& command);
Future send(const framing::AMQBody& command, const framing::MethodContent&
content);
- /**
- * This method takes the content as a FrameSet; if reframe=false,
- * the caller is resposnible for ensuring that the header and
- * content frames in that set are correct for this connection
- * (right flags, right fragmentation etc). If reframe=true, then
- * the header and content from the frameset will be copied and
- * reframed correctly for the connection.
- */
- QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const
framing::FrameSet& content, bool reframe=false);
+ QPID_CLIENT_EXTERN Future send(const framing::AMQBody& command, const
framing::FrameSet& content);
void sendRawFrame(framing::AMQFrame& frame);
Demux& getDemux();
@@ -125,11 +117,6 @@ public:
*/
boost::shared_ptr<ConnectionImpl> getConnection();
- void setDoClearDeliveryPropertiesExchange(bool b=true) {
doClearDeliveryPropertiesExchange = b; }
-
- /** Suppress sending detach in destructor. Used by cluster to build
session state */
- void disableAutoDetach();
-
private:
enum State {
INACTIVE,
@@ -225,10 +212,6 @@ private:
SessionState sessionState;
- bool doClearDeliveryPropertiesExchange;
-
- bool autoDetach;
-
friend class client::SessionHandler;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]