Author: aconway
Date: Wed Dec  1 21:33:12 2010
New Revision: 1041181

URL: http://svn.apache.org/viewvc?rev=1041181&view=rev
Log:
Modified cluster_tests causes broker shut down with invalid-argument error.

Described in https://bugzilla.redhat.com/show_bug.cgi?id=655078.  The
management agent's deleted-object list was not being replicated to new
members joining the cluster, so management generated fewer deleted
object notifications on the newer member, causing it to fail with an
invalid-argument error. The list is now being replicated correctly.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Wed Dec  1 21:33:12 2010
@@ -93,7 +93,9 @@ cluster_la_SOURCES =                          \
   qpid/cluster/SecureConnectionFactory.h        \
   qpid/cluster/SecureConnectionFactory.cpp      \
   qpid/cluster/StoreStatus.h                   \
-  qpid/cluster/StoreStatus.cpp
+  qpid/cluster/StoreStatus.cpp                 \
+  qpid/cluster/UpdateDataExchange.h            \
+  qpid/cluster/UpdateDataExchange.cpp
 
 cluster_la_LIBADD=  -lcpg $(libcman) libqpidbroker.la libqpidclient.la
 cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Dec  1 21:33:12 2010
@@ -127,6 +127,7 @@
 #include "qpid/cluster/UpdateClient.h"
 #include "qpid/cluster/RetractClient.h"
 #include "qpid/cluster/FailoverExchange.h"
+#include "qpid/cluster/UpdateDataExchange.h"
 #include "qpid/cluster/UpdateExchange.h"
 #include "qpid/cluster/ClusterTimer.h"
 
@@ -197,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 964709;
+const uint32_t Cluster::CLUSTER_VERSION = 1039478;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -289,6 +290,10 @@ Cluster::Cluster(const ClusterSettings& 
     // without modifying delivery-properties.exchange.
     broker.getExchanges().registerExchange(
         boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+    // Update-data exchange is used for passing data that may be too large
+    // for single control frame.
+    broker.getExchanges().registerExchange(
+        boost::shared_ptr<broker::Exchange>(new UpdateDataExchange(this, 
broker.getManagementAgent())));
 
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Dec  1 21:33:12 2010
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "qpid/amqp_0_10/Codecs.h"
 #include "Connection.h"
 #include "UpdateClient.h"
 #include "Cluster.h"
@@ -42,6 +43,7 @@
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/ClusterSafe.h"
+#include "qpid/types/Variant.h"
 #include "qpid/management/ManagementAgent.h"
 #include <boost/current_function.hpp>
 
@@ -51,7 +53,8 @@ namespace cluster {
 
 using namespace framing;
 using namespace framing::cluster;
-
+using amqp_0_10::ListCodec;
+using types::Variant;
 
 qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
 
@@ -626,15 +629,6 @@ void Connection::addQueueListener(const 
     
findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
 }
 
-void Connection::managementSchema(const std::string& data) {
-    management::ManagementAgent* agent = 
cluster.getBroker().getManagementAgent();
-    if (!agent)
-        throw Exception(QPID_MSG("Management schema update but management not 
enabled."));
-    framing::Buffer buf(const_cast<char*>(data.data()), data.size());
-    agent->importSchemas(buf);
-    QPID_LOG(debug, cluster << " updated management schemas");
-}
-
 //
 // This is the handler for incoming managementsetup messages.
 //
@@ -648,15 +642,5 @@ void Connection::managementSetupState(ui
     agent->setNextObjectId(objectNum);
     agent->setBootSequence(bootSequence);
 }
-
-void Connection::managementAgents(const std::string& data) {
-    management::ManagementAgent* agent = 
cluster.getBroker().getManagementAgent();
-    if (!agent)
-        throw Exception(QPID_MSG("Management agent update but management not 
enabled."));
-    framing::Buffer buf(const_cast<char*>(data.data()), data.size());
-    agent->importAgents(buf);
-    QPID_LOG(debug, cluster << " updated management agents");
-}
-
 }} // Namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Dec  1 21:33:12 2010
@@ -177,8 +177,6 @@ class Connection :
     OutputInterceptor& getOutput() { return output; }
 
     void addQueueListener(const std::string& queue, uint32_t listener);
-    void managementSchema(const std::string& data);
-    void managementAgents(const std::string& data);
     void managementSetupState(uint64_t objectNum, uint16_t bootSequence);
     void setSecureConnection ( broker::SecureConnection * sc );
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Dec  1 21:33:12 
2010
@@ -18,12 +18,14 @@
  * under the License.
  *
  */
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/cluster/UpdateClient.h"
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/ClusterMap.h"
 #include "qpid/cluster/Connection.h"
 #include "qpid/cluster/Decoder.h"
 #include "qpid/cluster/ExpiryPolicy.h"
+#include "qpid/cluster/UpdateDataExchange.h"
 #include "qpid/client/SessionBase_0_10Access.h" 
 #include "qpid/client/ConnectionAccess.h" 
 #include "qpid/client/SessionImpl.h" 
@@ -52,6 +54,7 @@
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/TypeCode.h"
 #include "qpid/log/Statement.h"
+#include "qpid/types/Variant.h"
 #include "qpid/Url.h"
 #include "qmf/org/apache/qpid/broker/ManagementSetupState.h"
 #include <boost/bind.hpp>
@@ -62,12 +65,14 @@
 namespace qpid {
 namespace cluster {
 
+using amqp_0_10::ListCodec;
 using broker::Broker;
 using broker::Exchange;
 using broker::Queue;
 using broker::QueueBinding;
 using broker::Message;
 using broker::SemanticState;
+using types::Variant;
 
 using namespace framing;
 namespace arg=client::arg;
@@ -153,7 +158,6 @@ void UpdateClient::update() {
     std::for_each(connections.begin(), connections.end(),
                   boost::bind(&UpdateClient::updateConnection, this, _1));
     session.queueDelete(arg::queue=UPDATE);
-    session.close();
 
     // Update queue listeners: must come after sessions so consumerNumbering 
is populated
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, 
this, _1));
@@ -162,14 +166,16 @@ void UpdateClient::update() {
 
     updateManagementAgent();
 
+    session.close();
+
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), 
false);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
 
-    // FIXME aconway 2010-06-16: Connection will be closed from the other end.
-    // connection.close();
+    // NOTE: connection will be closed from the other end, don't close
+    // it here as that causes a race.
     
     // FIXME aconway 2010-03-15: This sleep avoids the race condition
     // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
@@ -221,12 +227,34 @@ void UpdateClient::updateManagementAgent
 {
     management::ManagementAgent* agent = updaterBroker.getManagementAgent();
     if (!agent) return;
-    // Send management schemas and agents.
     string data;
+
+    QPID_LOG(debug, updaterId << " updating management schemas. ")
     agent->exportSchemas(data);
-    ClusterConnectionProxy(session).managementSchema(data);
+    session.messageTransfer(
+        arg::content=client::Message(data, 
UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY),
+        arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+    QPID_LOG(debug, updaterId << " updating management agents. ")
     agent->exportAgents(data);
-    ClusterConnectionProxy(session).managementAgents(data);
+    session.messageTransfer(
+        arg::content=client::Message(data, 
UpdateDataExchange::MANAGEMENT_AGENTS_KEY),
+        arg::destination=UpdateDataExchange::EXCHANGE_NAME);
+
+    QPID_LOG(debug, updaterId << " updating management deleted objects. ")
+    typedef management::ManagementAgent::DeletedObjectList DeletedObjectList;
+    DeletedObjectList deleted;
+    agent->exportDeletedObjects(deleted);
+    Variant::List list;
+    for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); 
++i) {
+        string encoded;
+        (*i)->encode(encoded);
+        list.push_back(encoded);
+    }
+    ListCodec::encode(list, data);
+    session.messageTransfer(
+        arg::content=client::Message(data, 
UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY),
+        arg::destination=UpdateDataExchange::EXCHANGE_NAME);
 }
 
 void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp?rev=1041181&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp Wed Dec  1 
21:33:12 2010
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "UpdateDataExchange.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace cluster {
+
+const std::string 
UpdateDataExchange::EXCHANGE_NAME("qpid.cluster-update-data");
+const std::string 
UpdateDataExchange::EXCHANGE_TYPE("qpid.cluster-update-data");
+const std::string 
UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents");
+const std::string 
UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
+const std::string 
UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
+
+UpdateDataExchange::UpdateDataExchange(management::Manageable* parent,
+                                       management::ManagementAgent* agent_) :
+    Exchange(EXCHANGE_NAME, parent),
+    agent(agent_)
+{}
+
+void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& 
routingKey,
+                               const qpid::framing::FieldTable* )
+{
+    std::string data = msg.getMessage().getFrames().getContent();
+    if (routingKey == MANAGEMENT_AGENTS_KEY)
+        managementAgents(data);
+    else if (routingKey == MANAGEMENT_SCHEMAS_KEY)
+        managementSchemas(data);
+    else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY)
+        managementDeletedObjects(data);
+    else
+        throw Exception(
+            QPID_MSG("Cluster update-data exchange received unknown 
routing-key: "
+                     << routingKey));
+}
+
+void UpdateDataExchange::managementAgents(const std::string& data) {
+    if (!agent)
+        throw Exception(
+            QPID_MSG("Received management agent update but management is 
disabled."));
+    framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+    agent->importAgents(buf);
+    QPID_LOG(debug, " Updated management agents.");
+}
+
+void UpdateDataExchange::managementSchemas(const std::string& data) {
+    if (!agent)
+        throw Exception(
+            QPID_MSG("Received management schema update but management is 
disabled."));
+    framing::Buffer buf(const_cast<char*>(data.data()), data.size());
+    agent->importSchemas(buf);
+    QPID_LOG(debug, " Updated management schemas");
+}
+
+void UpdateDataExchange::managementDeletedObjects(const std::string& data) {
+    using amqp_0_10::ListCodec;
+    using types::Variant;
+    if (!agent)
+        throw Exception(
+            QPID_MSG("Management agent update but management not enabled."));
+    Variant::List encoded;
+    ListCodec::decode(data, encoded);
+    management::ManagementAgent::DeletedObjectList objects;
+    for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) 
{
+        
objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr(
+                              new 
management::ManagementAgent::DeletedObject(*i)));
+    }
+    agent->importDeletedObjects(objects);
+    QPID_LOG(debug, " Updated management deleted objects.");
+}
+
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h?rev=1041181&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h Wed Dec  1 
21:33:12 2010
@@ -0,0 +1,81 @@
+#ifndef QPID_CLUSTER_UPDATEDATAEXCHANGE_H
+#define QPID_CLUSTER_UPDATEDATAEXCHANGE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Exchange.h"
+
+namespace qpid {
+
+namespace management {
+class ManagementAgent;
+}
+
+namespace cluster {
+
+/**
+ * An exchange used to send data that is to large for a control
+ * during update. The routing key indicates the type of data.
+ */
+class UpdateDataExchange : public broker::Exchange
+{
+  public:
+    static const std::string EXCHANGE_NAME;
+    static const std::string EXCHANGE_TYPE;
+    static const std::string MANAGEMENT_AGENTS_KEY;
+    static const std::string MANAGEMENT_SCHEMAS_KEY;
+    static const std::string MANAGEMENT_DELETED_OBJECTS_KEY;
+
+    UpdateDataExchange(management::Manageable* parent, 
management::ManagementAgent*);
+
+    void route(broker::Deliverable& msg, const std::string& routingKey,
+               const framing::FieldTable* args);
+
+    // Not implemented
+    std::string getType() const { return EXCHANGE_TYPE; }
+
+    bool bind(boost::shared_ptr<broker::Queue>,
+              const std::string&,
+              const qpid::framing::FieldTable*)
+    { return false; }
+
+    bool unbind(boost::shared_ptr<broker::Queue>,
+                const std::string&,
+                const qpid::framing::FieldTable*)
+    { return false; }
+
+    bool isBound(boost::shared_ptr<broker::Queue>,
+                 const std::string*,
+                 const qpid::framing::FieldTable*)
+    { return false; }
+
+  private:
+    management::ManagementAgent* agent;
+
+    void managementAgents(const std::string&);
+    void managementSchemas(const std::string&);
+    void managementDeletedObjects(const std::string&);
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_UPDATEDATAEXCHANGE_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateDataExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Wed Dec  1 
21:33:12 2010
@@ -655,6 +655,11 @@ void ManagementAgent::periodicProcessing
          iter != managementObjects.end();
          iter++) {
         ManagementObject* object = iter->second;
+
+        if (object->isDeleted()) {
+            deleteList.push_back(pair<ObjectId, 
ManagementObject*>(iter->first, object));
+        }
+
         object->setFlags(0);
         if (clientWasAdded) {
             object->setForcePublish(true);
@@ -663,6 +668,52 @@ void ManagementAgent::periodicProcessing
 
     clientWasAdded = false;
 
+    // Remove Deleted objects, and save for later publishing...
+    //
+    for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = 
deleteList.rbegin();
+         iter != deleteList.rend();
+         iter++) {
+
+        ManagementObject* delObj = iter->second;
+        DeletedObject::shared_ptr dptr(new DeletedObject());
+        std::string classkey(delObj->getPackageName() + std::string(":") + 
delObj->getClassName());
+        bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || 
delObj->getForcePublish()));
+
+        dptr->packageName = delObj->getPackageName();
+        dptr->className = delObj->getClassName();
+        delObj->getObjectId().encode(dptr->objectId);
+
+        if (qmf1Support) {
+            delObj->writeProperties(dptr->encodedV1Config);
+            if (send_stats) {
+                delObj->writeStatistics(dptr->encodedV1Inst);
+            }
+        }
+
+        if (qmf2Support) {
+            Variant::Map map_;
+            Variant::Map values;
+            Variant::Map oid;
+
+            delObj->getObjectId().mapEncode(oid);
+            map_["_object_id"] = oid;
+            map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(),
+                                                   delObj->getClassName(),
+                                                   "_data",
+                                                   delObj->getMd5Sum());
+            delObj->writeTimestamps(map_);
+            delObj->mapEncodeValues(values, true, send_stats);
+            map_["_values"] = values;
+
+            dptr->encodedV2 = map_;
+        }
+
+        pendingDeletedObjs[classkey].push_back(dptr);
+
+        delete iter->second;
+        managementObjects.erase(iter->first);
+    }
+
     //
     // Process the entire object map.  Remember: we drop the userLock each 
time we call
     // sendBuffer().  This allows the managementObjects map to be altered 
during the
@@ -711,7 +762,13 @@ void ManagementAgent::periodicProcessing
                 if (object->getConfigChanged() || object->getInstChanged())
                     object->setUpdateTime();
 
-                send_props = (object->getConfigChanged() || 
object->getForcePublish() || object->isDeleted());
+                // skip any objects marked deleted since our first pass.  Deal 
with them
+                // on the next periodic cycle...
+                if (object->isDeleted()) {
+                    continue;
+                }
+
+                send_props = (object->getConfigChanged() || 
object->getForcePublish());
                 send_stats = (object->hasInst() && (object->getInstChanged() 
|| object->getForcePublish()));
 
                 if (send_props && qmf1Support) {
@@ -749,8 +806,6 @@ void ManagementAgent::periodicProcessing
                 if (send_props) pcount++;
                 if (send_stats) scount++;
 
-                if (object->isDeleted())
-                    deleteList.push_back(pair<ObjectId, 
ManagementObject*>(iter->first, object));
                 object->setForcePublish(false);
 
                 if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
@@ -796,12 +851,114 @@ void ManagementAgent::periodicProcessing
         }
     }  // end processing updates for all objects
 
-    // Delete flagged objects
-    for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = 
deleteList.rbegin();
-         iter != deleteList.rend();
-         iter++) {
-        delete iter->second;
-        managementObjects.erase(iter->first);
+
+    // now send the pending deletes.  Make a temporary copy of the pending 
deletes so dropping the
+    // lock when the buffer is sent is safe.
+    //
+    if (!pendingDeletedObjs.empty()) {
+        PendingDeletedObjsMap tmp(pendingDeletedObjs);
+        pendingDeletedObjs.clear();
+
+        for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != 
tmp.end(); mIter++) {
+            std::string packageName;
+            std::string className;
+            Buffer msgBuffer(msgChars, BUFSIZE);
+            uint32_t v1Objs = 0;
+            uint32_t v2Objs = 0;
+            Variant::List list_;
+
+            size_t pos = mIter->first.find(":");
+            packageName = mIter->first.substr(0, pos);
+            className = mIter->first.substr(pos+1);
+
+            for (DeletedObjectList::iterator lIter = mIter->second.begin();
+                 lIter != mIter->second.end(); lIter++) {
+
+                if (!(*lIter)->encodedV1Config.empty()) {
+                    encodeHeader(msgBuffer, 'c');
+                    msgBuffer.putRawData((*lIter)->encodedV1Config);
+                    v1Objs++;
+                }
+                if (!(*lIter)->encodedV1Inst.empty()) {
+                    encodeHeader(msgBuffer, 'i');
+                    msgBuffer.putRawData((*lIter)->encodedV1Inst);
+                    v1Objs++;
+                }
+                if (v1Objs && msgBuffer.available() < HEADROOM) {
+                    v1Objs = 0;
+                    contentSize = BUFSIZE - msgBuffer.available();
+                    stringstream key;
+                    key << "console.obj.1.0." << packageName << "." << 
className;
+                    msgBuffer.reset();
+                    sendBufferLH(msgBuffer, contentSize, mExchange, 
key.str());   // UNLOCKS USERLOCK
+                    QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) 
to=" << key.str());
+                }
+
+                if (!(*lIter)->encodedV2.empty()) {
+                    list_.push_back((*lIter)->encodedV2);
+                    if (++v2Objs >= maxV2ReplyObjs) {
+                        v2Objs = 0;
+
+                        string content;
+                        ListCodec::encode(list_, content);
+                        list_.clear();
+                        if (content.length()) {
+                            stringstream key;
+                            Variant::Map  headers;
+                            key << "agent.ind.data." << 
keyifyNameStr(packageName)
+                                << "." << keyifyNameStr(className)
+                                << "." << vendorNameKey
+                                << "." << productNameKey;
+                            if (!instanceNameKey.empty())
+                                key << "." << instanceNameKey;
+
+                            headers["method"] = "indication";
+                            headers["qmf.opcode"] = "_data_indication";
+                            headers["qmf.content"] = "_data";
+                            headers["qmf.agent"] = name_address;
+
+                            sendBufferLH(content, "", headers, "amqp/list", 
v2Topic, key.str());  // UNLOCKS USERLOCK
+                            QPID_LOG(trace, "SEND Multicast ContentInd V2 
(delete) to=" << key.str() << " len=" << content.length());
+                        }
+                    }
+                }
+            }  // end current list
+
+            // send any remaining objects...
+
+            if (v1Objs) {
+                contentSize = BUFSIZE - msgBuffer.available();
+                stringstream key;
+                key << "console.obj.1.0." << packageName << "." << className;
+                msgBuffer.reset();
+                sendBufferLH(msgBuffer, contentSize, mExchange, key.str());   
// UNLOCKS USERLOCK
+                QPID_LOG(trace, "SEND V1 Multicast ContentInd V1 (delete) to=" 
<< key.str());
+            }
+
+            if (!list_.empty()) {
+                string content;
+                ListCodec::encode(list_, content);
+                list_.clear();
+                if (content.length()) {
+                    stringstream key;
+                    Variant::Map  headers;
+                    key << "agent.ind.data." << keyifyNameStr(packageName)
+                        << "." << keyifyNameStr(className)
+                        << "." << vendorNameKey
+                        << "." << productNameKey;
+                    if (!instanceNameKey.empty())
+                        key << "." << instanceNameKey;
+
+                    headers["method"] = "indication";
+                    headers["qmf.opcode"] = "_data_indication";
+                    headers["qmf.content"] = "_data";
+                    headers["qmf.agent"] = name_address;
+
+                    sendBufferLH(content, "", headers, "amqp/list", v2Topic, 
key.str());  // UNLOCKS USERLOCK
+                    QPID_LOG(trace, "SEND Multicast ContentInd V2 (delete) 
to=" << key.str() << " len=" << content.length());
+                }
+            }
+        }  // end map
     }
 
     if (!deleteList.empty()) {
@@ -2700,3 +2857,143 @@ Variant ManagementAgent::toVariant(const
     return out;
 }
 
+
+// Build up a list of the current set of deleted objects that are pending their
+// next (last) publish-ment.
+void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
+{
+    sys::Mutex::ScopedLock lock (userLock);
+    list<pair<ObjectId, ManagementObject*> > deleteList;
+
+    moveNewObjectsLH();
+
+    for (ManagementObjectMap::iterator iter = managementObjects.begin();
+         iter != managementObjects.end();
+         iter++) {
+        ManagementObject* object = iter->second;
+
+        if (object->isDeleted()) {
+            deleteList.push_back(pair<ObjectId, 
ManagementObject*>(iter->first, object));
+        }
+    }
+
+    // Remove Deleted objects, and save for later publishing...
+    //
+    for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = 
deleteList.rbegin();
+         iter != deleteList.rend();
+         iter++) {
+
+        ManagementObject* delObj = iter->second;
+        DeletedObject::shared_ptr dptr(new DeletedObject());
+        std::string classkey(delObj->getPackageName() + std::string(":") + 
delObj->getClassName());
+        bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || 
delObj->getForcePublish()));
+
+        dptr->packageName = delObj->getPackageName();
+        dptr->className = delObj->getClassName();
+        delObj->getObjectId().encode(dptr->objectId);
+
+        if (qmf1Support) {
+            delObj->writeProperties(dptr->encodedV1Config);
+            if (send_stats) {
+                delObj->writeStatistics(dptr->encodedV1Inst);
+            }
+        }
+
+        if (qmf2Support) {
+            Variant::Map map_;
+            Variant::Map values;
+            Variant::Map oid;
+
+            delObj->getObjectId().mapEncode(oid);
+            map_["_object_id"] = oid;
+            map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(),
+                                                   delObj->getClassName(),
+                                                   "_data",
+                                                   delObj->getMd5Sum());
+            delObj->writeTimestamps(map_);
+            delObj->mapEncodeValues(values, true, send_stats);
+            map_["_values"] = values;
+
+            dptr->encodedV2 = map_;
+        }
+
+        pendingDeletedObjs[classkey].push_back(dptr);
+
+        delete iter->second;
+        managementObjects.erase(iter->first);
+    }
+
+    // now copy the pending deletes into the outList
+
+    for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
+         mIter != pendingDeletedObjs.end(); mIter++) {
+        for (DeletedObjectList::iterator lIter = mIter->second.begin();
+             lIter != mIter->second.end(); lIter++) {
+            outList.push_back(*lIter);
+        }
+    }
+}
+
+
+// Merge this list's deleted objects to the management Agent's list of deleted
+// objects waiting for next (last) publish-ment.
+void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
+{
+    sys::Mutex::ScopedLock lock (userLock);
+
+    for (DeletedObjectList::const_iterator lIter = inList.begin(); lIter != 
inList.end(); lIter++) {
+
+        std::string classkey((*lIter)->packageName + std::string(":") + 
(*lIter)->className);
+        DeletedObjectList& dList = pendingDeletedObjs[classkey];
+
+        // not sure if this is necessary - merge by objectid....
+        bool found = false;
+        for (DeletedObjectList::iterator dIter = dList.begin(); dIter != 
dList.end(); dIter++) {
+            if ((*dIter)->objectId == (*lIter)->objectId) {
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            dList.push_back(*lIter);
+        }
+    }
+}
+
+
+// construct a DeletedObject from an encoded representation. Used by
+// clustering to move deleted objects between clustered brokers.  See
+// DeletedObject::encode() for the reverse.
+ManagementAgent::DeletedObject::DeletedObject(const std::string& encoded)
+{
+    qpid::types::Variant::Map map_;
+    MapCodec::decode(encoded, map_);
+
+    packageName = map_["_package_name"].getString();
+    className = map_["_class_name"].getString();
+    objectId = map_["_object_id"].getString();
+
+    encodedV1Config = map_["_v1_config"].getString();
+    encodedV1Inst = map_["_v1_inst"].getString();
+    encodedV2 = map_["_v2_data"].asMap();
+}
+
+
+// encode a DeletedObject to a string buffer. Used by
+// clustering to move deleted objects between clustered brokers.  See
+// DeletedObject(const std::string&) for the reverse.
+void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
+{
+    qpid::types::Variant::Map map_;
+
+
+    map_["_package_name"] = packageName;
+    map_["_class_name"] = className;
+    map_["_object_id"] = objectId;
+
+    map_["_v1_config"] = encodedV1Config;
+    map_["_v1_inst"] = encodedV1Inst;
+    map_["_v2_data"] = encodedV2;
+
+    MapCodec::encode(map_, toBuffer);
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Wed Dec  1 
21:33:12 2010
@@ -148,6 +148,40 @@ public:
     static boost::shared_ptr<framing::FieldValue> toFieldValue(const 
types::Variant& in);
     static types::Variant toVariant(const 
boost::shared_ptr<framing::FieldValue>& val);
 
+    // For Clustering: management objects that have been marked as
+    // "deleted", but are waiting for their last published object
+    // update are not visible to the cluster replication code.  These
+    // interfaces allow clustering to gather up all the management
+    // objects that are deleted in order to allow all clustered
+    // brokers to publish the same set of deleted objects.
+
+    class DeletedObject {
+      public:
+        typedef boost::shared_ptr<DeletedObject> shared_ptr;
+        DeletedObject() {};
+        DeletedObject( const std::string &encoded );
+        ~DeletedObject() {};
+        void encode( std::string& toBuffer );
+
+      private:
+      friend class ManagementAgent;
+
+        std::string packageName;
+        std::string className;
+        std::string objectId;
+
+        std::string encodedV1Config;    // qmfv1 properties
+        std::string encodedV1Inst;      // qmfv1 statistics
+        qpid::types::Variant::Map encodedV2;
+    };
+
+    typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
+
+    /** returns a snapshot of all currently deleted management objects. */
+    void exportDeletedObjects( DeletedObjectList& outList );
+
+    /** Import a list of deleted objects to send on next publish interval. */
+    void importDeletedObjects( const DeletedObjectList& inList );
 
 private:
     struct Periodic : public qpid::sys::TimerTask
@@ -293,6 +327,13 @@ private:
     // message.
     uint32_t maxV2ReplyObjs;
 
+    // list of objects that have been deleted, but have yet to be published
+    // one final time.
+    // Indexed by a string composed of the object's package and class name.
+    // Protected by userLock.
+    typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
+    PendingDeletedObjsMap pendingDeletedObjs;
+
 #   define MA_BUFFER_SIZE 65536
     char inputBuffer[MA_BUFFER_SIZE];
     char outputBuffer[MA_BUFFER_SIZE];

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Dec  1 21:33:12 2010
@@ -298,7 +298,7 @@ class LongTests(BrokerTest):
         receiver.stop()
         for i in range(i, len(cluster)): cluster[i].kill()
 
-    def test_management(self):
+    def test_management(self, args=[]):
         """Stress test: Run management clients and other clients 
concurrently."""
 
         class ClientLoop(StoppableThread):
@@ -353,7 +353,7 @@ class LongTests(BrokerTest):
                 StoppableThread.stop(self)
 
         # def test_management
-        args = ["--mgmt-pub-interval", 1] # Publish management information 
every second.
+        args += ["--mgmt-pub-interval", 1] # Publish management information 
every second.
         # Use store if present.
         if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
         cluster = self.cluster(3, args)
@@ -402,6 +402,9 @@ class LongTests(BrokerTest):
         for c in chain(mclients, *clients):
             c.stop()
 
+    def test_management_qmf2(self):
+        self.test_management(args=["--mgmt-qmf2=yes"])
+
 class StoreTests(BrokerTest):
     """
     Cluster tests that can only be run if there is a store available.

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1041181&r1=1041180&r2=1041181&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Dec  1 21:33:12 2010
@@ -263,20 +263,10 @@
       <field name="consumer" type="uint32"/>
     </control>
 
-    <!-- Replicate management agent schema -->
-    <control name="management-schema" code="0x35">
-      <field name="data" type="vbin32"/>
-    </control>
-
     <!-- added by jrd.  propagate a management-setup-state widget -->
     <control name="management-setup-state" code="0x36">
       <field name="objectNum" type="uint64"/>
       <field name="bootSequence" type="uint16"/>
     </control>
-
-    <!-- Replicate management agent's remote-agent map -->
-    <control name="management-agents" code="0x37">
-      <field name="data" type="vbin32"/>
-    </control>
   </class>
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to