Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
 Tue Jan 27 15:00:13 2015
@@ -73,6 +73,7 @@ class ProtocolImpl : public BrokerContex
     qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, 
qpid::sys::OutputControl&, const std::string&, const 
qpid::sys::SecuritySettings&);
     boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> 
translate(const qpid::broker::Message&);
     boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
+    qpid::framing::ProtocolVersion supportedVersion() const;
   private:
 };
 
@@ -158,5 +159,10 @@ boost::shared_ptr<RecoverableMessage> Pr
     }
 }
 
+qpid::framing::ProtocolVersion ProtocolImpl::supportedVersion() const
+{
+    return qpid::framing::ProtocolVersion(1,0);
+}
+
 
 }}} // namespace qpid::broker::amqp

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp 
Tue Jan 27 15:00:13 2015
@@ -23,6 +23,7 @@
 #include "qpid/log/Statement.h"
 #include <algorithm>
 #include <string.h>
+#include "config.h"
 
 namespace qpid {
 namespace broker {
@@ -126,7 +127,13 @@ bool OutgoingFromRelay::doWork()
 {
     relay->check();
     relay->setCredit(pn_link_credit(link));
-    return relay->send(link);
+    bool worked = relay->send(link);
+    pn_delivery_t *d = pn_link_current(link);
+    if (d && pn_delivery_writable(d)) {
+        handle(d);
+        return true;
+    }
+    return worked;
 }
 /**
  * Called when a delivery is writable
@@ -163,7 +170,7 @@ void OutgoingFromRelay::handle(pn_delive
 /**
  * Signals that this link has been detached
  */
-void OutgoingFromRelay::detached()
+void OutgoingFromRelay::detached(bool /*closed*/)
 {
     relay->detached(this);
 }
@@ -221,7 +228,7 @@ uint32_t IncomingToRelay::getCredit()
     return relay->getCredit();
 }
 
-void IncomingToRelay::detached()
+void IncomingToRelay::detached(bool /*closed*/)
 {
     relay->detached(this);
 }
@@ -238,7 +245,11 @@ void BufferedTransfer::initIn(pn_link_t*
     //copy delivery tag
     pn_delivery_tag_t dt = pn_delivery_tag(d);
     tag.resize(dt.size);
+#ifdef NO_PROTON_DELIVERY_TAG_T
+    ::memmove(&tag[0], dt.start, dt.size);
+#else
     ::memmove(&tag[0], dt.bytes, dt.size);
+#endif
 
     //set context
     pn_delivery_set_context(d, this);
@@ -258,7 +269,11 @@ bool BufferedTransfer::settle()
 void BufferedTransfer::initOut(pn_link_t* link)
 {
     pn_delivery_tag_t dt;
+#ifdef NO_PROTON_DELIVERY_TAG_T
+    dt.start = &tag[0];
+#else
     dt.bytes = &tag[0];
+#endif
     dt.size = tag.size();
     out.handle = pn_delivery(link, dt);
     //set context

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h 
Tue Jan 27 15:00:13 2015
@@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoin
                       const std::string& target, const std::string& name, 
boost::shared_ptr<Relay>);
     bool doWork();
     void handle(pn_delivery_t* delivery);
-    void detached();
+    void detached(bool closed);
     void init();
     void setSubjectFilter(const std::string&);
     void setSelectorFilter(const std::string&);
@@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming
     bool settle();
     bool doWork();
     bool haveWork();
-    void detached();
+    void detached(bool closed);
     void readable(pn_delivery_t* delivery);
     uint32_t getCredit();
   private:

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp 
Tue Jan 27 15:00:13 2015
@@ -48,6 +48,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/Codecs.h"
+#include "config.h"
 #include <boost/intrusive_ptr.hpp>
 #include <boost/format.hpp>
 #include <map>
@@ -505,10 +506,11 @@ void Session::setupOutgoing(pn_link_t* l
             if (!settings.autodelete) settings.autodelete = autodelete;
             altExchange = node.topic->getAlternateExchange();
         }
-        if (!settings.autoDeleteDelay) {
+        if (settings.original.find("qpid.auto_delete_timeout") == 
settings.original.end()) {
             //only use delay from link if policy didn't specify one
             settings.autoDeleteDelay = pn_terminus_get_timeout(source);
-            settings.original["qpid.auto_delete_timeout"] = 
settings.autoDeleteDelay;
+            if (settings.autoDeleteDelay)
+                settings.original["qpid.auto_delete_timeout"] = 
settings.autoDeleteDelay;
         }
         if (settings.autoDeleteDelay) {
             settings.autodelete = true;
@@ -577,7 +579,7 @@ void Session::detach(pn_link_t* link)
     if (pn_link_is_sender(link)) {
         OutgoingLinks::iterator i = outgoing.find(link);
         if (i != outgoing.end()) {
-            i->second->detached();
+            i->second->detached(true/*TODO: checked whether actually closed; 
see PROTON-773*/);
             boost::shared_ptr<Queue> q = 
OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get());
             if (q && !q->isAutoDelete() && !q->isDeleted()) {
                 connection.getBroker().deleteQueue(q->getName(), 
connection.getUserId(), connection.getMgmtId());
@@ -588,7 +590,7 @@ void Session::detach(pn_link_t* link)
     } else {
         IncomingLinks::iterator i = incoming.find(link);
         if (i != incoming.end()) {
-            i->second->detached();
+            i->second->detached(true/*TODO: checked whether actually closed; 
see PROTON-773*/);
             incoming.erase(i);
             QPID_LOG(debug, "Incoming link detached");
         }
@@ -615,7 +617,11 @@ void Session::accepted(pn_delivery_t* de
 void Session::readable(pn_link_t* link, pn_delivery_t* delivery)
 {
     pn_delivery_tag_t tag = pn_delivery_tag(delivery);
+#ifdef NO_PROTON_DELIVERY_TAG_T
+    QPID_LOG(debug, "received delivery: " << std::string(tag.start, tag.size));
+#else
     QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size));
+#endif
     incomingMessageReceived();
     IncomingLinks::iterator target = incoming.find(link);
     if (target == incoming.end()) {
@@ -653,7 +659,7 @@ bool Session::dispatch()
             pn_condition_set_name(error, e.symbol());
             pn_condition_set_description(error, e.what());
             pn_link_close(s->first);
-            s->second->detached();
+            s->second->detached(true);
             outgoing.erase(s++);
             output = true;
         }
@@ -678,7 +684,7 @@ bool Session::dispatch()
             pn_condition_set_name(error, e.symbol());
             pn_condition_set_description(error, e.what());
             pn_link_close(i->first);
-            i->second->detached();
+            i->second->detached(true);
             incoming.erase(i++);
             output = true;
         }
@@ -690,10 +696,10 @@ bool Session::dispatch()
 void Session::close()
 {
     for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); 
++i) {
-        i->second->detached();
+        i->second->detached(false);
     }
     for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); 
++i) {
-        i->second->detached();
+        i->second->detached(false);
     }
     outgoing.clear();
     incoming.clear();

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
 Tue Jan 27 15:00:13 2015
@@ -55,6 +55,7 @@ using qpid::messaging::AssertionFailed;
 using qpid::framing::ExchangeBoundResult;
 using qpid::framing::ExchangeQueryResult;
 using qpid::framing::FieldTable;
+using qpid::framing::FieldValue;
 using qpid::framing::QueueQueryResult;
 using qpid::framing::ReplyTo;
 using qpid::framing::Uuid;
@@ -140,6 +141,11 @@ const std::string PREFIX_AMQ("amq.");
 const std::string PREFIX_QPID("qpid.");
 
 const Verifier verifier;
+
+bool areEquivalent(const FieldValue& a, const FieldValue& b)
+{
+    return ((a == b) || (a.convertsTo<int64_t>() && b.convertsTo<int64_t>() && 
a.get<int64_t>() == b.get<int64_t>()));
+}
 }
 
 struct Binding
@@ -534,19 +540,19 @@ Subscription::Subscription(const Address
       reliable(durable ? !AddressResolution::is_unreliable(address) : 
AddressResolution::is_reliable(address)),
       actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : 
specifiedType) : type),
       exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
-      autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(true)),
+      
autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(!(durable || 
reliable))),
       
exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)),
       alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str())
 {
-    const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
-    if (timeout) {
+
+    if ((Opt(address)/LINK).hasKey(TIMEOUT)) {
+        const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value;
         if (timeout->asUint32()) 
queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32());
-    } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) {
-        //if durable but not explicitly reliable, then set a non-zero
-        //default for the autodelete timeout (previously this would
-        //have defaulted to autodelete immediately anyway, so the risk
-        //of the change causing problems is mitigated)
-        queueOptions.setInt("qpid.auto_delete_delay", 15*60);
+    } else if (durable && !reliable && 
!(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) {
+        //if durable but not reliable, and auto-delete not
+        //explicitly set, then set a non-zero default for the
+        //autodelete timeout
+        queueOptions.setInt("qpid.auto_delete_timeout", 2*60);
     }
     (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
     (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -609,7 +615,7 @@ void Subscription::subscribe(qpid::clien
 
     //create subscription queue:
     session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
-                         arg::autoDelete=autoDeleteQueue && (!(durable || 
reliable)), arg::durable=durable,
+                         arg::autoDelete=autoDeleteQueue, arg::durable=durable,
                          arg::alternateExchange=alternateExchange,
                          arg::arguments=queueOptions);
     //'default' binding:
@@ -806,7 +812,7 @@ void Queue::checkAssert(qpid::client::As
                 FieldTable::ValuePtr v = result.getArguments().get(i->first);
                 if (!v) {
                     throw AssertionFailed((boost::format("Option %1% not set 
for %2%") % i->first % name).str());
-                } else if (*i->second != *v) {
+                } else if (!areEquivalent(*i->second, *v)) {
                     throw AssertionFailed((boost::format("Option %1% does not 
match for %2%, expected %3%, got %4%")
                                           % i->first % name % *(i->second) % 
*v).str());
                 }
@@ -906,7 +912,7 @@ void Exchange::checkAssert(qpid::client:
                 FieldTable::ValuePtr v = result.getArguments().get(i->first);
                 if (!v) {
                     throw AssertionFailed((boost::format("Option %1% not set 
for %2%") % i->first % name).str());
-                } else if (*i->second != *v) {
+                } else if (!areEquivalent(*i->second, *v)) {
                     throw AssertionFailed((boost::format("Option %1% does not 
match for %2%, expected %3%, got %4%")
                                           % i->first % name % *(i->second) % 
*v).str());
                 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp 
Tue Jan 27 15:00:13 2015
@@ -33,7 +33,18 @@ namespace framing {
 
 namespace {
 //each range contains 2 numbers, 4 bytes each
-uint16_t RANGE_SIZE = 2 * 4; 
+uint16_t RANGE_SIZE = 2 * 4;
+int32_t MAX_RANGE = 2147483647;//2^31-1
+
+int32_t gap(const SequenceNumber& a, const SequenceNumber& b)
+{
+    return a < b ? b - a : a - b;
+}
+
+bool is_max_range(const SequenceNumber& a, const SequenceNumber& b)
+{
+    return gap(a, b) == MAX_RANGE;
+}
 }
 
 void SequenceSet::encode(Buffer& buffer) const
@@ -54,7 +65,21 @@ void SequenceSet::decode(Buffer& buffer)
         throw IllegalArgumentException(QPID_MSG("Invalid size for sequence 
set: " << size)); 
 
     for (uint16_t i = 0; i < count; i++) {
-        add(SequenceNumber(buffer.getLong()), 
SequenceNumber(buffer.getLong()));
+        SequenceNumber a(buffer.getLong());
+        SequenceNumber b(buffer.getLong());
+        if (b < a)
+            throw IllegalArgumentException(QPID_MSG("Invalid range in sequence 
set: " << a << " -> " << b));
+        if (is_max_range(a, b)) {
+            //RangeSet holds 'half-closed' ranges, where the end is
+            //one past the 'highest' value in the range. So if the
+            //range is already the maximum expressable with a 32bit
+            //sequence number, we can't represent it as a
+            //'half-closed' range, so we represent it as two ranges.
+            add(a, b-1);
+            add(b);
+        } else {
+            add(a, b);
+        }
     }
 }
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp Tue 
Jan 27 15:00:13 2015
@@ -482,6 +482,7 @@ shared_ptr<PrimaryTxObserver> Primary::m
 {
     shared_ptr<PrimaryTxObserver> observer =
         PrimaryTxObserver::create(*this, haBroker, txBuffer);
+    sys::Mutex::ScopedLock l(lock);
     txMap[observer->getTxQueue()->getName()] = observer;
     return observer;
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES 
Tue Jan 27 15:00:13 2015
@@ -25,7 +25,7 @@ Current/pending:
  ------ -------  ----------------------
    5359 -        Linearstore: Implement new management schema and wire into 
store
    5360 -        Linearstore: Evaluate and rework logging to produce a 
consistent log output
-   5361 1145359  Linearstore: No tests for linearstore functionality currently 
exist
+*  5361 1145359  Linearstore: No tests for linearstore functionality currently 
exist
                    svn r.1564893 2014-02-05: Added tx-test-soak.sh
                    svn r.1564935 2014-02-05: Added license text to 
tx-test-soak.sh
                    svn r.1625283 2014-09-16: Basic python tests from 
legacystore ported over to linearstore
@@ -37,15 +37,22 @@ Current/pending:
                    ** Basic performance tests
    5464 -        [linearstore] Incompletely created journal files accumulate 
in EFP
    -    1088944  [Linearstore] store does not return all files to EFP after 
purging big queue <queue purge issue>
-   6043 1066256  [LinearStore] changing efp size after using store broke the 
new durable nodes creation
+*  -    1066256  [LinearStore] changing efp size after using store broke the 
new durable nodes creation
    -    1067480  [LinearStore] Provide a way to limit max count/size of empty 
files in EFP
    -    1067429  [LinearStore] last file from deleted queue is not moved to 
EFP  <queue delete issue>
-   -    1067482  [LinearStore] Provide a way to prealocate empty pages in EFP
-*  5671          [linearstore] Add ability to use disk partitions and select 
per-queue EFPs
-                   svn r.1636598 2014-11-04: WIP: New EFP and journal dir 
structure using symlinks
-                   svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir 
structure to new
-   6230 1165200  [linearstore] qpid-qls-analyze fails when analyzing empty 
journal
-                   svn r.1643053 2014-11-18: Proposed fix
+   -    1067482  [LinearStore] Provide a way to preallocate empty pages in EFP
+*  6303 1180660  [linearstore] Roll back auto-upgrade of store directory 
structure 
+*  5362 1145363  Linearstore: No store tools exist for examining the journals
+                   svn r.1556888 2014-01-09: WIP checkin for linearstore 
version of qpid_qls_analyze. Needs testing and tidy-up.
+                   svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+                   svn r.1561848 2014-01-27: Bugfixes and enhancements for 
qpid_qls_analyze
+                   svn r.1564808 2014-02-05: Bugfixes and enhancements for 
qpid_qls_analyze
+                   svn r.1578899 2014-03-18: Bugfixes and enhancements for 
qpid_qls_analyze
+                   svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
+                   * Store analysis and status
+                   * Recovery/reading of message content
+                   * Empty file pool status and management
+   
 
 
 
@@ -115,16 +122,6 @@ NO-JIRA -        Added missing Apache co
    5651       -  [C++ broker] segfault in 
qpid::linearstore::journal::jdir::clear_dir when declaring durable queue
                    svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec
                    * Bug introduced by r.1578899.
-   5362 1145363  Linearstore: No store tools exist for examining the journals
-                   svn r.1556888 2014-01-09: WIP checkin for linearstore 
version of qpid_qls_analyze. Needs testing and tidy-up.
-                   svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
-                   svn r.1561848 2014-01-27: Bugfixes and enhancements for 
qpid_qls_analyze
-                   svn r.1564808 2014-02-05: Bugfixes and enhancements for 
qpid_qls_analyze
-                   svn r.1578899 2014-03-18: Bugfixes and enhancements for 
qpid_qls_analyze
-                   svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
-                   * Store analysis and status
-                   * Recovery/reading of message content
-                   * Empty file pool status and management
    5661       -  [linearstore] Set default cmake build to exclude linearstore
                    svn r.1584379 2014-04-03 Proposed solution.
                    * Run ccmake, select BUILD_LINEARSTORE to change its value 
to ON to build.
@@ -147,8 +144,16 @@ NO-JIRA -        Added missing Apache co
                    svn r.1631360 2014-10-13 Proposed solution
    6157 1150397  linearstore: segfault when 2 journals request new journal 
file from empty EFP
                    svn r.1632504 2014-10-17 Proposed solution by pmoravec   
-   6248  1167911 [linearstore] Symlink creation fails if store dir path is not 
absolute
+   6230 1165200  [linearstore] qpid-qls-analyze fails when analyzing empty 
journal
+                   svn r.1643053 2014-11-18: Proposed fix
+   6248 1167911  [linearstore] Symlink creation fails if store dir path is not 
absolute
                    svn r.1641689 2014-11-25 Proposed solution
+   5671 1160367  [linearstore] Add ability to use disk partitions and select 
per-queue EFPs
+                   svn r.1636598 2014-11-04: WIP: New EFP and journal dir 
structure using symlinks
+                   svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir 
structure to new
+                   svn r.1649081 2015-01-02: WIP: Specify new queue using 
qpid-config --durable together with --efp-partition-num and/or 
--efp-pool-file-size. Needs testing.
+   -    1148807  [linearstore] Restarting broker with empty journal raises 
confusing warning
+                   Fixed by svn r.1649081 of bug 5671 / 1160367 above
                    
 
 Ordered checkin list:
@@ -187,14 +192,17 @@ no.   svn r  Q-JIRA     RHBZ       Date
 28. 1596509    5767  1098118 2014-05-21 0.22-mrg (pmoravec)
 29. 1596633 NO-JIRA  1078937 2014-05-21 (includes tools install update)
 30. 1599243    5767  1098118 2014-06-02 0.22-mrg
-30. 1599243    5767  1098118 2014-06-02
-31. 1614665    5924  1124906 2014-07-30
-32. 1620426    6043  1089652 2014-08-25
-33. 1631360    6147  1152012 2014-10-13 (pmoravec)
-34. 1632504    6157  1150397 2014-10-17 (pmoravec)
-35. 1636598    5671          2014-11-04
-36. 1637985    5671          2014-11-10
-37. 1641689    6248  1167911 2014-11-25
+31. 1599243    5767  1098118 2014-06-02
+32. 1614665    5924  1124906 2014-07-30
+33. 1620426    6043  1089652 2014-08-25
+34. 1631360    6147  1152012 2014-10-13 (pmoravec)
+35. 1632504    6157  1150397 2014-10-17 (pmoravec)
+36. 1636598    5671  1160367 2014-11-04
+37. 1637985    5671  1160367 2014-11-10
+38. 1643053    6230  1165200 2014-11-18
+39. 1641689    6248  1167911 2014-11-25
+40. 1649081    5671  1160367 2015-01-02
+41. 1649082 NO-JIRA        - 2015-01-02
 
 See above sections for details on these checkins.
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
 Tue Jan 27 15:00:13 2015
@@ -148,7 +148,7 @@ void MessageStoreImpl::initManagement ()
             mgmtObject = 
qmf::org::apache::qpid::linearstore::Store::shared_ptr (
                 new qmf::org::apache::qpid::linearstore::Store(agent, this, 
broker));
 
-            mgmtObject->set_location(storeDir);
+            mgmtObject->set_storeDir(storeDir);
             mgmtObject->set_tplIsInitialized(false);
             mgmtObject->set_tplDirectory(getTplBaseDir());
             mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * 
QLS_SBLK_SIZE_BYTES);
@@ -406,7 +406,7 @@ void MessageStoreImpl::create(qpid::brok
 
     if (queue_.getName().size() == 0)
     {
-        QLS_LOG(error, "Cannot create store for empty (null) queue name - 
ignoring and attempting to continue.");
+        QLS_LOG(error, "Cannot create store for empty (null) queue name - 
queue create ignored.");
         return;
     }
 
@@ -449,15 +449,15 @@ qpid::linearstore::journal::EmptyFilePoo
 MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) {
     qpid::framing::FieldTable::ValuePtr value;
     qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = 
defaultEfpPartitionNumber;
-    value = args_.get("qpid.efp_partition");
+    value = args_.get("qpid.efp_partition_num");
     if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
-        localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), 
"qpid.efp_partition");
+        localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), 
"qpid.efp_partition_num");
     }
 
     qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = 
defaultEfpFileSize_kib;
-    value = args_.get("qpid.efp_file_size");
+    value = args_.get("qpid.efp_pool_file_size");
     if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) {
-        localEfpFileSizeKib = 
chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" );
+        localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(), 
"qpid.efp_pool_file_size");
     }
     return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib);
 }
@@ -1488,21 +1488,21 @@ std::string MessageStoreImpl::getStoreTo
 std::string MessageStoreImpl::getJrnlBaseDir()
 {
     std::ostringstream dir;
-    dir << storeDir << "/" << storeTopLevelDir << "/jrnl/" ;
+    dir << storeDir << "/" << storeTopLevelDir << "/jrnl2/" ;
     return dir.str();
 }
 
 std::string MessageStoreImpl::getBdbBaseDir()
 {
     std::ostringstream dir;
-    dir << storeDir << "/" << storeTopLevelDir << "/dat/" ;
+    dir << storeDir << "/" << storeTopLevelDir << "/dat2/" ;
     return dir.str();
 }
 
 std::string MessageStoreImpl::getTplBaseDir()
 {
     std::ostringstream dir;
-    dir << storeDir << "/" << storeTopLevelDir << "/tpl/" ;
+    dir << storeDir << "/" << storeTopLevelDir << "/tpl2/" ;
     return dir.str();
 }
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
 Tue Jan 27 15:00:13 2015
@@ -59,10 +59,16 @@ EmptyFilePool::EmptyFilePool(const std::
 EmptyFilePool::~EmptyFilePool() {}
 
 void EmptyFilePool::initialize() {
-//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition 
" << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ 
<< std::endl; // DEBUG
-    std::vector<std::string> dirList;
+    if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) 
{ // Create EFP dir if it does not yet exist
+        if (errno != EEXIST) {
+            std::ostringstream oss;
+            oss << "directory=" << efpDirectory_ << " " << 
FORMAT_SYSERR(errno);
+            throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), 
"EmptyFilePool", "initialize");
+        }
+    }
 
     // Process empty files in main dir
+    std::vector<std::string> dirList;
     jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
     for (std::vector<std::string>::iterator i = dirList.begin(); i != 
dirList.end(); ++i) {
         size_t dotPos = i->rfind(".");
@@ -122,14 +128,14 @@ const efpIdentity_t EmptyFilePool::getId
 
 std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
     std::string emptyFileName = popEmptyFile();
-    std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + 
emptyFileName.substr(emptyFileName.rfind('/'));
+    std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + 
emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes 
leading '/'
     std::string symlinkName = destDirectory + 
emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes 
leading '/'
-    if (moveFile(emptyFileName, newFileName)) {
+    if (!moveFile(emptyFileName, newFileName)) {
         // Try again with new UUID for file name
         newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + 
getEfpFileName();
-        if (moveFile(emptyFileName, newFileName)) {
+        if (!moveFile(emptyFileName, newFileName)) {
 //std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from  
EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
-            pushEmptyFile(emptyFileName);
+            pushEmptyFile(emptyFileName); // Return empty file to pool
             std::ostringstream oss;
             oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName 
<< "\"" << FORMAT_SYSERR(errno);
             throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), 
"EmptyFilePool", "takeEmptyFile");
@@ -138,7 +144,7 @@ std::string EmptyFilePool::takeEmptyFile
     if (createSymLink(newFileName, symlinkName)) {
         std::ostringstream oss;
         oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName << 
"\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", 
"takeEmptyFile");
+        throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", 
"takeEmptyFile");
     }
     return symlinkName;
 }
@@ -189,12 +195,27 @@ efpDataSize_kib_t EmptyFilePool::dataSiz
 }
 
 // --- protected functions ---
+void EmptyFilePool::checkIosState(std::ofstream& ofs,
+                                  const uint32_t jerrno,
+                                  const std::string& fqFileName,
+                                  const std::string& operation,
+                                  const std::string& errorMessage,
+                                  const std::string& className,
+                                  const std::string& fnName) {
+    if (!ofs.good()) {
+        if (ofs.is_open()) {
+            ofs.close();
+        }
+        std::ostringstream oss;
+        oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << 
(ofs.fail()?"T":"F") << " badbit="
+            << (ofs.bad()?"T":"F") << " file=" << fqFileName << " operation=" 
<< operation << ": " << errorMessage;
+        throw jexception(jerrno, oss.str(), className, fnName);
+    }
+}
 
 std::string EmptyFilePool::createEmptyFile() {
     std::string efpfn = getEfpFileName();
-    if (!overwriteFileContents(efpfn)) {
-        // TODO: handle failure to prepare new file here
-    }
+    overwriteFileContents(efpfn);
     return efpfn;
 }
 
@@ -226,24 +247,20 @@ void EmptyFilePool::initializeSubDirecto
     }
 }
 
-bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
+void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
     ::file_hdr_t fh;
     ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 
QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), 
efpDataSize_kib_);
     std::ofstream ofs(fqFileName.c_str(), std::ofstream::out | 
std::ofstream::binary);
-    if (ofs.good()) {
-        ofs.write((char*)&fh, sizeof(::file_hdr_t));
-        uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * 
QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
-        while (rem--)
-            ofs.put('\0');
-        ofs.close();
-        return true;
-//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created 
new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the 
fly" << std::endl; // DEBUG
-    } else {
-        std::ostringstream oss;
-        oss << "std::ofstream ofs: file=\"" << fqFileName.c_str() << "\"" << " 
failed to be open";
-        throw jexception(jerrno::JERR_EFP_FOPEN, oss.str(), "EmptyFilePool", 
"overwriteFileContents");
+    checkIosState(ofs, jerrno::JERR_EFP_FOPEN, fqFileName, "constructor", 
"Failed to create file", "EmptyFilePool", "overwriteFileContents");
+    ofs.write((char*)&fh, sizeof(::file_hdr_t));
+    checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "write()", "Failed 
to write header", "EmptyFilePool", "overwriteFileContents");
+    uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * 
QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t);
+    while (rem--) {
+        ofs.put('\0');
+        checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "put()", 
"Failed to put \0", "EmptyFilePool", "overwriteFileContents");
     }
-    return false;
+    ofs.close();
+//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created 
new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the 
fly" << std::endl; // DEBUG
 }
 
 std::string EmptyFilePool::popEmptyFile() {
@@ -271,7 +288,7 @@ void EmptyFilePool::pushEmptyFile(const
 
 void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
     std::string returnedFileName = efpDirectory_ + "/" + 
s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // 
NOTE: substr() includes leading '/'
-    if (moveFile(emptyFileName, returnedFileName)) {
+    if (!moveFile(emptyFileName, returnedFileName)) {
         ::unlink(emptyFileName.c_str());
 //std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " 
<< returnedFileName << "; deleted." << std::endl; // DEBUG
     }
@@ -283,7 +300,7 @@ void EmptyFilePool::returnEmptyFile(cons
         overwriteFileContents(returnedFileName);
     }
     std::string sanitizedEmptyFileName = efpDirectory_ + 
returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() 
includes leading '/'
-    if (moveFile(returnedFileName, sanitizedEmptyFileName)) {
+    if (!moveFile(returnedFileName, sanitizedEmptyFileName)) {
         ::unlink(returnedFileName.c_str());
 //std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to 
" << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
     } else {
@@ -395,18 +412,6 @@ bool EmptyFilePool::validateEmptyFile(co
     return true;
 }
 
-// static
-int EmptyFilePool::moveFile(const std::string& from,
-                                 const std::string& to) {
-    if (::rename(from.c_str(), to.c_str())) {
-        if (errno == EEXIST) return errno; // File name exists
-        std::ostringstream oss;
-        oss << "file=\"" << from << "\" dest=\"" <<  to << "\"" << 
FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", 
"returnEmptyFile");
-    }
-    return 0;
-}
-
 //static
 int EmptyFilePool::createSymLink(const std::string& fqFileName,
                                  const std::string& fqLinkName) {
@@ -414,7 +419,7 @@ int EmptyFilePool::createSymLink(const s
         if (errno == EEXIST) return errno; // File name exists
         std::ostringstream oss;
         oss << "file=\"" << fqFileName << "\" symlink=\"" <<  fqLinkName << 
"\"" << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", 
"createSymLink");
+        throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", 
"createSymLink");
     }
     return 0;
 }
@@ -426,7 +431,7 @@ std::string EmptyFilePool::deleteSymlink
     if (len < 0) {
         std::ostringstream oss;
         oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", 
"deleteSymlink");
+        throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", 
"deleteSymlink");
     }
     ::unlink(fqLinkName.c_str());
     return std::string(buff, len);
@@ -455,4 +460,18 @@ bool EmptyFilePool::isSymlink(const std:
 
 }
 
+// static
+bool EmptyFilePool::moveFile(const std::string& from,
+                             const std::string& to) {
+    if (::rename(from.c_str(), to.c_str())) {
+        if (errno == EEXIST) {
+            return false; // File name exists
+        }
+        std::ostringstream oss;
+        oss << "file=\"" << from << "\" dest=\"" <<  to << "\"" << 
FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", 
"returnEmptyFile");
+    }
+    return true;
+}
+
 }}}

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
 Tue Jan 27 15:00:13 2015
@@ -87,23 +87,30 @@ public:
                                                      const 
efpPartitionNumber_t partitionNumber);
 
 protected:
+    void checkIosState(std::ofstream& ofs,
+                       const uint32_t jerrno,
+                       const std::string& fqFileName,
+                       const std::string& operation,
+                       const std::string& errorMessage,
+                       const std::string& className,
+                       const std::string& fnName);
     std::string createEmptyFile();
     std::string getEfpFileName();
     void initializeSubDirectory(const std::string& fqDirName);
-    bool overwriteFileContents(const std::string& fqFileName);
+    void overwriteFileContents(const std::string& fqFileName);
     std::string popEmptyFile();
     void pushEmptyFile(const std::string fqFileName);
     void returnEmptyFile(const std::string& emptyFileName);
     void resetEmptyFileHeader(const std::string& fqFileName);
     bool validateEmptyFile(const std::string& emptyFileName) const;
 
-    static int moveFile(const std::string& fromFqPath,
-                        const std::string& toFqPath);
     static int createSymLink(const std::string& fqFileName,
                              const std::string& fqLinkName);
     static std::string deleteSymlink(const std::string& fqLinkName);
     static bool isFile(const std::string& fqName);
     static bool isSymlink(const std::string& fqName);
+    static bool moveFile(const std::string& fromFqPath,
+                         const std::string& toFqPath);
 };
 
 }}}

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
 Tue Jan 27 15:00:13 2015
@@ -74,6 +74,7 @@ void EmptyFilePoolManager::findEfpPartit
         if (!foundPartition) {
             std::ostringstream oss1;
             oss1 << qlsStorePath_ << "/" << 
EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+                << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_
                 << "/" << 
EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
             jdir::create_dir(oss1.str());
             insertPartition(defaultPartitionNumber_, oss1.str());
@@ -165,9 +166,10 @@ EmptyFilePool* EmptyFilePoolManager::get
 EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const 
efpPartitionNumber_t partitionNumber,
                                                       const efpDataSize_kib_t 
efpDataSize_kib) {
     EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ? 
partitionNumber : defaultPartitionNumber_);
-    if (efppp != 0)
-        return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : 
defaultEfpDataSize_kib_);
-    return 0;
+    if (efppp == 0) {
+        return 0;
+    }
+    return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : 
defaultEfpDataSize_kib_, true);
 }
 
 void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& 
emptyFilePoolList,

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
 Tue Jan 27 15:00:13 2015
@@ -32,6 +32,9 @@ namespace qpid {
 namespace linearstore {
 namespace journal {
 
+// static
+const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets 
the top-level efp dir within a partition
+
 EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t 
partitionNum,
                                                const std::string& partitionDir,
                                                const bool 
overwriteBeforeReturnFlag,
@@ -57,72 +60,31 @@ EmptyFilePoolPartition::~EmptyFilePoolPa
 void
 EmptyFilePoolPartition::findEmptyFilePools() {
 //std::cout << "*** EmptyFilePoolPartition::findEmptyFilePools(): Reading " << 
partitionDir_ << std::endl; // DEBUG
-    std::vector<std::string> dirList;
-    bool upgradeDirStructureFlag = false;
-    std::string oldPartitionDir;
-        jdir::read_dir(partitionDir_, dirList, true, false, false, false);
-//std::cout << "*** dirList.size()=" << dirList.size() << "; dirList.front()=" 
<< dirList.front() << std::endl; // DEBUG
-        if (dirList.size() == 1 && dirList.front().compare("efp") == 0) {
-            upgradeDirStructureFlag = true;
-            oldPartitionDir = partitionDir_ + "/efp";
-//std::cout << "*** oldPartitionDir=" << oldPartitionDir << std::endl; // DEBUG
-            dirList.clear();
-            jdir::read_dir(oldPartitionDir, dirList, true, false, false, 
false);
-        }
+    std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
+    if (jdir::is_dir(efpDir)) {
+        std::vector<std::string> dirList;
+        jdir::read_dir(efpDir, dirList, true, false, false, true);
         for (std::vector<std::string>::iterator i = dirList.begin(); i != 
dirList.end(); ++i) {
-            std::string fqFileName(partitionDir_ + "/" + *i);
-            if (upgradeDirStructureFlag) {
-                std::string fqOldFileName(partitionDir_ + "/efp/" + *i);
-                if (::rename(fqOldFileName.c_str(), fqFileName.c_str())) {
-                    // File move failed
-                    std::ostringstream oss;
-                    oss << "File \'" << fqOldFileName << "\' could not be 
renamed to \'" << fqFileName << "\' (" << FORMAT_SYSERR(errno) << "); file 
deleted";
-                    journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
-                    if (::unlink(fqOldFileName.c_str())) {
-                        std::ostringstream oss;
-                        oss << "File \'" << fqOldFileName << "\' could not be 
deleted (" << FORMAT_SYSERR(errno) << "\'; file orphaned";
-                        journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
-                    }
-                }
-            }
-            EmptyFilePool* efpp = 0;
-            try {
-                efpp = new EmptyFilePool(fqFileName, this, 
overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
-                {
-                    slock l(efpMapMutex_);
-                    efpMap_[efpp->dataSize_kib()] = efpp;
-                }
-            }
-            catch (const std::exception& e) {
-                if (efpp != 0) {
-                    delete efpp;
-                    efpp = 0;
-                }
-                std::ostringstream oss;
-                oss << "EmptyFilePool create failed: " << e.what();
-                journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
-            }
-            if (efpp != 0) {
-                efpp->initialize();
-            }
-        }
-        if (upgradeDirStructureFlag) {
-            std::string oldEfpDir(partitionDir_ + "/efp");
-            if (::rmdir(oldEfpDir.c_str())) {
-                // Unable to delete old "efp" dir
-                std::ostringstream oss;
-                oss << "Unable to delete old EFP directory \'" << oldEfpDir << 
"\' (" << FORMAT_SYSERR(errno) << "\'; directory orphaned";
-                journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
-            }
+            createEmptyFilePool(*i);
         }
+    } else {
+        std::ostringstream oss;
+        oss << "Partition \"" << partitionDir_ << "\" does not contain top 
level EFP dir \"" << s_efpTopLevelDir_ << "\"";
+        journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+    }
 }
 
-EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const 
efpDataSize_kib_t efpDataSize_kib) {
-    slock l(efpMapMutex_);
-    efpMapItr_t i = efpMap_.find(efpDataSize_kib);
-    if (i == efpMap_.end())
-        return 0;
-    return i->second;
+EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const 
efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) {
+    {
+        slock l(efpMapMutex_);
+        efpMapItr_t i = efpMap_.find(efpDataSize_kib);
+        if (i != efpMap_.end())
+            return i->second;
+    }
+    if (createIfNonExistent) {
+        return createEmptyFilePool(efpDataSize_kib);
+    }
+    return 0;
 }
 
 void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& 
efpList) {
@@ -183,7 +145,7 @@ std::string EmptyFilePoolPartition::getP
 //static
 efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const 
std::string& name) {
     if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && 
::isdigit(name[2]) && ::isdigit(name[3])) {
-        long pn = ::strtol(name.c_str() + 1, 0, 0);
+        long pn = ::strtol(name.c_str() + 1, 0, 10);
         if (pn == 0 && errno) {
             return 0;
         } else {
@@ -195,12 +157,42 @@ efpPartitionNumber_t EmptyFilePoolPartit
 
 // --- protected functions ---
 
+EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const 
efpDataSize_kib_t efpDataSize_kib) {
+    std::string fqEfpDirectoryName(partitionDir_ + "/" + 
EmptyFilePoolPartition::s_efpTopLevelDir_ + "/" + 
EmptyFilePool::dirNameFromDataSize(efpDataSize_kib));
+    return createEmptyFilePool(fqEfpDirectoryName);
+}
+
+EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string 
fqEfpDirectoryName) {
+    EmptyFilePool* efpp = 0;
+    try {
+        efpp = new EmptyFilePool(fqEfpDirectoryName, this, 
overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
+        {
+            slock l(efpMapMutex_);
+            efpMap_[efpp->dataSize_kib()] = efpp;
+        }
+    }
+    catch (const std::exception& e) {
+        if (efpp != 0) {
+            delete efpp;
+            efpp = 0;
+        }
+        std::ostringstream oss;
+        oss << "EmptyFilePool create failed: " << e.what();
+        journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+    }
+    if (efpp != 0) {
+        efpp->initialize();
+    }
+    return efpp;
+}
+
 void EmptyFilePoolPartition::validatePartitionDir() {
+    std::ostringstream ss;
     if (!jdir::is_dir(partitionDir_)) {
-        std::ostringstream ss;
         ss << "Invalid partition directory: \'" << partitionDir_ << "\' is not 
a directory";
         throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), 
"EmptyFilePoolPartition", "validatePartitionDir");
     }
+
     // TODO: other validity checks here
 }
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
 Tue Jan 27 15:00:13 2015
@@ -37,6 +37,8 @@ class JournalLog;
 
 class EmptyFilePoolPartition
 {
+public:
+    static const std::string s_efpTopLevelDir_;
 protected:
     typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
     typedef efpMap_t::iterator efpMapItr_t;
@@ -59,7 +61,7 @@ public:
     virtual ~EmptyFilePoolPartition();
 
     void findEmptyFilePools();
-    EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
+    EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, 
const bool createIfNonExistent);
     void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
     void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& 
efpDataSizesList) const;
     std::string getPartitionDirectory() const;
@@ -70,6 +72,8 @@ public:
     static efpPartitionNumber_t getPartitionNumber(const std::string& name);
 
 protected:
+    EmptyFilePool* createEmptyFilePool(const efpDataSize_kib_t 
efpDataSize_kib);
+    EmptyFilePool* createEmptyFilePool(const std::string fqEfpDirectoryName);
     void validatePartitionDir();
 };
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
 Tue Jan 27 15:00:13 2015
@@ -43,6 +43,7 @@
 #include "qpid/linearstore/journal/utils/file_hdr.h"
 #include <sstream>
 #include <string>
+#include <unistd.h>
 #include <vector>
 
 namespace qpid {
@@ -101,7 +102,11 @@ void RecoveryManager::analyzeJournals(co
     analyzeJournalFileHeaders(efpIdentity);
 
     if (journalEmptyFlag_) {
-        *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); 
// Use default EFP
+        if (uninitFileList_.empty()) {
+            *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 
0); // Use default EFP
+        } else {
+            *emptyFilePoolPtrPtr = 
emptyFilePoolManager->getEmptyFilePool(efpIdentity);
+        }
     } else {
         *emptyFilePoolPtrPtr = 
emptyFilePoolManager->getEmptyFilePool(efpIdentity);
         if (! *emptyFilePoolPtrPtr) {
@@ -294,6 +299,7 @@ void RecoveryManager::setLinearFileContr
                                                       LinearFileController* 
lfcPtr) {
     if (journalEmptyFlag_) {
         if (uninitFileList_.size() > 0) {
+            // TODO: Handle case if uninitFileList_.size() > 1, but this 
should not happen in normal operation. Here we assume only one item in the list.
             std::string uninitFile = uninitFileList_.back();
             uninitFileList_.pop_back();
             lfcPtr->restoreEmptyFile(uninitFile);
@@ -377,11 +383,28 @@ void RecoveryManager::analyzeJournalFile
     jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
     for (stringListConstItr_t i = directoryList.begin(); i != 
directoryList.end(); ++i) {
         bool hdrOk = readJournalFileHeader(*i, fileHeader, headerQueueName);
-        if (!hdrOk || headerQueueName.empty()) {
+        bool hdrEmpty = ::is_file_hdr_reset(&fileHeader);
+        if (!hdrOk) {
             std::ostringstream oss;
-            oss << "Journal file " << (*i) << " is uninitialized or corrupted";
+            oss << "Journal file " << (*i) << " is corrupted or invalid";
             journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+        } else if (hdrEmpty) {
+            // Read symlink, find efp directory name which is efp size in KiB
+            // TODO: place this bit into a common function as it is also used 
in EmptyFilePool.cpp::deleteSymlink()
+            char buff[1024];
+            ssize_t len = ::readlink((*i).c_str(), buff, 1024);
+            if (len < 0) {
+                std::ostringstream oss;
+                oss << "symlink=\"" << (*i) << "\"" << FORMAT_SYSERR(errno);
+                throw jexception(jerrno::JERR__SYMLINK, oss.str(), 
"RecoveryManager", "analyzeJournalFileHeaders");
+            }
+            // Find second and third '/' from back of string, which contains 
the EFP directory name
+            *(::strrchr(buff, '/')) = '\0';
+            *(::strrchr(buff, '/')) = '\0';
+            int efpDataSize_kib = atoi(::strrchr(buff, '/') + 1);
             uninitFileList_.push_back(*i);
+            efpIdentity.pn_ = fileHeader._efp_partition;
+            efpIdentity.ds_ = efpDataSize_kib;
         } else if (headerQueueName.compare(queueName_) != 0) {
             std::ostringstream oss;
             oss << "Journal file " << (*i) << " belongs to queue \"" << 
headerQueueName << "\": ignoring";
@@ -406,6 +429,7 @@ void RecoveryManager::analyzeJournalFile
         }
     }
 
+//std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() 
fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG
     if (fileNumberMap_.empty()) {
         journalEmptyFlag_ = true;
     } else {
@@ -905,7 +929,9 @@ bool RecoveryManager::readJournalFileHea
     }
     ifs.close();
     ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t));
-    if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, 
QLS_MAX_QUEUE_NAME_LEN)) return false;
+    if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, 
QLS_MAX_QUEUE_NAME_LEN)) {
+        return false;
+    }
     queueName.assign(buffer + sizeof(::file_hdr_t), 
fileHeaderRef._queue_name_len);
     return true;
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
 Tue Jan 27 15:00:13 2015
@@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__UNEXPRESPON
 const uint32_t jerrno::JERR__RECNFOUND           = 0x0109;
 const uint32_t jerrno::JERR__NOTIMPL             = 0x010a;
 const uint32_t jerrno::JERR__NULL                = 0x010b;
+const uint32_t jerrno::JERR__SYMLINK             = 0x010c;
 
 // class jcntl
 const uint32_t jerrno::JERR_JCNTL_STOPPED        = 0x0200;
@@ -112,10 +113,11 @@ const uint32_t jerrno::JERR_EFP_BADPARTI
 const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME    = 0x0d03;
 const uint32_t jerrno::JERR_EFP_NOEFP            = 0x0d04;
 const uint32_t jerrno::JERR_EFP_EMPTY            = 0x0d05;
-const uint32_t jerrno::JERR_EFP_SYMLINK          = 0x0d06;
-const uint32_t jerrno::JERR_EFP_LSTAT            = 0x0d07;
-const uint32_t jerrno::JERR_EFP_BADFILETYPE      = 0x0d08;
-const uint32_t jerrno::JERR_EFP_FOPEN            = 0x0d09;
+const uint32_t jerrno::JERR_EFP_LSTAT            = 0x0d06;
+const uint32_t jerrno::JERR_EFP_BADFILETYPE      = 0x0d07;
+const uint32_t jerrno::JERR_EFP_FOPEN            = 0x0d08;
+const uint32_t jerrno::JERR_EFP_FWRITE           = 0x0d09;
+const uint32_t jerrno::JERR_EFP_MKDIR            = 0x0d0a;
 
 // Negative returns for some functions
 const int32_t jerrno::AIO_TIMEOUT                = -1;
@@ -140,6 +142,7 @@ jerrno::__init()
     _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found.";
     _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented";
     _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer";
+    _err_map[JERR__SYMLINK] = "JERR__SYMLINK: Symbolic link operation failed";
 
     // class jcntl
     _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped 
journal.";
@@ -210,10 +213,11 @@ jerrno::__init()
     _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid 
partition directory";
     _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for 
given partition and empty file size";
     _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
-    _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation 
failed";
     _err_map[JERR_EFP_LSTAT] = "JERR_EFP_LSTAT: lstat() operation failed";
     _err_map[JERR_EFP_BADFILETYPE] = "JERR_EFP_BADFILETYPE: File type 
incorrect for operation";
     _err_map[JERR_EFP_FOPEN] = "JERR_EFP_FOPEN: Unable to fopen file for 
write";
+    _err_map[JERR_EFP_FWRITE] = "JERR_EFP_FWRITE: Write failed";
+    _err_map[JERR_EFP_MKDIR] = "JERR_EFP_MKDIR: Directory creation failed";
 
     //_err_map[] = "";
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
 Tue Jan 27 15:00:13 2015
@@ -60,6 +60,7 @@ namespace journal {
         static const uint32_t JERR__RECNFOUND;          ///< Record not found
         static const uint32_t JERR__NOTIMPL;            ///< Not implemented
         static const uint32_t JERR__NULL;               ///< Operation on null 
pointer
+        static const uint32_t JERR__SYMLINK;            ///< Symbolic Link 
operation failed
 
         // class jcntl
         static const uint32_t JERR_JCNTL_STOPPED;       ///< Operation on 
stopped journal
@@ -130,10 +131,11 @@ namespace journal {
         static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition 
directory
         static const uint32_t JERR_EFP_NOEFP;           ///< No EFP found for 
given partition and file size
         static const uint32_t JERR_EFP_EMPTY;           ///< EFP empty
-        static const uint32_t JERR_EFP_SYMLINK;         ///< Symbolic Link 
operation failed
         static const uint32_t JERR_EFP_LSTAT;           ///< lstat operation 
failed
         static const uint32_t JERR_EFP_BADFILETYPE;     ///< Bad file type
         static const uint32_t JERR_EFP_FOPEN;           ///< Unable to fopen 
file for write
+        static const uint32_t JERR_EFP_FWRITE;          ///< Write failed
+        static const uint32_t JERR_EFP_MKDIR;           ///< Directory 
creation failed
 
         // Negative returns for some functions
         static const int32_t AIO_TIMEOUT;               ///< Timeout waiting 
for AIO return

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml
 Tue Jan 27 15:00:13 2015
@@ -21,38 +21,24 @@
 
   <class name="Store">
     <property name="brokerRef"               type="objId"  access="RO" 
references="qpid.Broker" index="y" parentRef="y"/>
-    <property name="location"                type="sstr"   access="RO"         
     desc="Logical directory on disk"/>
-    <!--property name="defaultInitialFileCount" type="uint16" access="RO" 
unit="file"  desc="Default number of files initially allocated to each 
journal"/-->
-    <!--property name="defaultDataFileSize"     type="uint32" access="RO" 
unit="RdPg"  desc="Default size of each journal data file"/-->
+    <property name="storeDir"                type="sstr"   access="RO"         
     desc="Logical directory on disk"/>
     <property name="tplIsInitialized"        type="bool"   access="RO"         
     desc="Transaction prepared list has been initialized by a transactional 
prepare"/>
     <property name="tplDirectory"            type="sstr"   access="RO"         
     desc="Transaction prepared list directory"/>
     <property name="tplWritePageSize"        type="uint32" access="RO" 
unit="byte"  desc="Page size in transaction prepared list write-page-cache"/>
     <property name="tplWritePages"           type="uint32" access="RO" 
unit="wpage" desc="Number of pages in transaction prepared list 
write-page-cache"/>
-    <!--property name="tplInitialFileCount"     type="uint16" access="RO" 
unit="file"  desc="Number of files initially allocated to transaction prepared 
list journal"/-->
-    <!--property name="tplDataFileSize"         type="uint32" access="RO" 
unit="byte"  desc="Size of each journal data file in transaction prepared list 
journal"/-->
-    <!--property name="tplCurrentFileCount"     type="uint32" access="RO" 
unit="file"  desc="Number of files currently allocated to transaction prepared 
list journal"/-->
 
     <statistic name="tplTransactionDepth"    type="hilo32"  unit="txn"    
desc="Number of currently enqueued prepared transactions"/>
     <statistic name="tplTxnPrepares"         type="count64" unit="record" 
desc="Total transaction prepares on transaction prepared list"/>
     <statistic name="tplTxnCommits"          type="count64" unit="record" 
desc="Total transaction commits on transaction prepared list"/>
     <statistic name="tplTxnAborts"           type="count64" unit="record" 
desc="Total transaction aborts on transaction prepared list"/>
-    <statistic name="tplOutstandingAIOs"     type="hilo32"  unit="aio_op" 
desc="Deprecated"/>
   </class>
 
   <class name="Journal">
     <property name="queueRef"           type="objId"  access="RO" 
references="qpid.Queue" isGeneralReference="y"/>
-    <property name="name"               type="sstr"   access="RC" index="y"/>
+    <property name="queueName"          type="sstr"   access="RC" index="y"/>
     <property name="directory"          type="sstr"   access="RO"              
desc="Directory containing journal files"/>
-    <property name="baseFileName"       type="sstr"   access="RO"              
desc="Deprecated"/>
     <property name="writePageSize"      type="uint32" access="RO" unit="byte"  
desc="Deprecated"/>
     <property name="writePages"         type="uint32" access="RO" unit="wpage" 
desc="Deprecated"/>
-    <property name="readPageSize"       type="uint32" access="RO" unit="byte"  
desc="Deprecated"/>
-    <property name="readPages"          type="uint32" access="RO" unit="rpage" 
desc="Deprecated"/>
-    <!--property name="initialFileCount"   type="uint16" access="RO" 
unit="file"  desc="Number of files initially allocated to this journal"/-->
-    <!--property name="autoExpand"         type="bool"   access="RO"           
   desc="Auto-expand enabled"/-->
-    <!--property name="currentFileCount"   type="uint16" access="RO" 
unit="file"  desc="Number of files currently allocated to this journal"/-->
-    <!--property name="maxFileCount"       type="uint16" access="RO" 
unit="file"  desc="Max number of files allowed for this journal"/-->
-    <!--property name="dataFileSize"       type="uint32" access="RO" 
unit="byte"  desc="Size of each journal data file"/-->
 
     <statistic name="recordDepth"       type="hilo32"  unit="record" 
desc="Number of currently enqueued records (durable messages)"/>
     <statistic name="enqueues"          type="count64" unit="record" 
desc="Total enqueued records on journal"/>
@@ -64,36 +50,5 @@
     <statistic name="txnAborts"         type="count64" unit="record" 
desc="Total transactional abort records on journal"/>
     <statistic name="outstandingAIOs"   type="hilo32"  unit="aio_op" 
desc="Number of currently outstanding AIO requests in Async IO system"/>
 
-<!--
-    The following are not yet "wired up" in JournalImpl.cpp
--->
-    <statistic name="freeFileCount"       type="hilo32"  unit="file"   
desc="Deprecated"/>
-    <statistic name="availableFileCount"  type="hilo32"  unit="file"   
desc="Deprecated"/>
-    <statistic name="writeWaitFailures"   type="count64" unit="record" 
desc="Deprecated"/>
-    <statistic name="writeBusyFailures"   type="count64" unit="record" 
desc="Deprecated"/>
-    <statistic name="readRecordCount"     type="count64" unit="record" 
desc="Deprecated"/>
-    <statistic name="readBusyFailures"    type="count64" unit="record" 
desc="Deprecated"/>
-    <statistic name="writePageCacheDepth" type="hilo32"  unit="wpage"  
desc="Deprecated"/>
-    <statistic name="readPageCacheDepth"  type="hilo32"  unit="rpage"  
desc="Deprecated"/>
-
-    <!--method name="expand" desc="Increase number of files allocated for this 
journal">
-      <arg name="by" type="uint32" dir="I" desc="Number of files to increase 
journal size by"/>
-    </method-->
   </class>
-
-  <eventArguments>
-    <!--arg name="autoExpand" type="bool"   desc="Journal auto-expand 
enabled"/-->
-    <arg name="fileSize"   type="uint32" desc="Journal file size in bytes"/>
-    <arg name="jrnlId"     type="sstr"   desc="Journal Id"/>
-    <arg name="numEnq"     type="uint32" desc="Number of recovered enqueues"/>
-    <arg name="numFiles"   type="uint16" desc="Number of journal files"/>
-    <arg name="numTxn"     type="uint32" desc="Number of recovered 
transactions"/>
-    <arg name="numTxnDeq"  type="uint32" desc="Number of recovered 
transactional dequeues"/>
-    <arg name="numTxnEnq"  type="uint32" desc="Number of recovered 
transactional enqueues"/>
-    <arg name="what"       type="sstr"   desc="Description of event"/>
-  </eventArguments>
-  <event name="enqThresholdExceeded" sev="warn"   args="jrnlId, what"/>
-  <event name="created"              sev="notice" args="jrnlId, fileSize, 
numFiles"/>
-  <event name="full"                 sev="error"  args="jrnlId, what"/>
-  <event name="recovered"            sev="notice" args="jrnlId, fileSize, 
numFiles, numEnq, numTxn, numTxnEnq, numTxnDeq"/>
 </schema>

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
 Tue Jan 27 15:00:13 2015
@@ -242,7 +242,7 @@ bool replace(Variant::Map& map, const st
     }
 }
 
-const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes
+const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes
 const uint32_t DEFAULT_TIMEOUT(0);
 }
 
@@ -267,7 +267,7 @@ AddressHelper::AddressHelper(const Addre
     bind(link, RELIABILITY, reliability);
     durableNode = test(node, DURABLE);
     durableLink = test(link, DURABLE);
-    timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : 
DEFAULT_TIMEOUT);
+    timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? 
DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
     std::string mode;
     if (bind(address, MODE, mode)) {
         if (mode == BROWSE) {
@@ -571,7 +571,8 @@ bool AddressHelper::enabled(const std::s
 
 bool AddressHelper::isUnreliable() const
 {
-    return reliability == AT_MOST_ONCE || reliability == UNRELIABLE;
+    return reliability == AT_MOST_ONCE || reliability == UNRELIABLE ||
+        (reliability.empty() && browse); // A browser defaults to unreliable.
 }
 
 const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
 Tue Jan 27 15:00:13 2015
@@ -44,7 +44,6 @@ class AddressHelper
     const qpid::types::Variant::Map& getNodeProperties() const;
     bool getLinkSource(std::string& out) const;
     bool getLinkTarget(std::string& out) const;
-    bool getBrowse() const { return browse; }
     const qpid::types::Variant::Map& getLinkProperties() const;
     static std::string getLinkName(const Address& address);
   private:

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
 Tue Jan 27 15:00:13 2015
@@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::share
             QPID_LOG(debug, "Received message of " << encoded->getSize() << " 
bytes: ");
             encoded->init(impl);
             impl.setEncoded(encoded);
-            impl.setInternalId(ssn->record(current, lnk->getBrowse()));
+            impl.setInternalId(ssn->record(current));
             pn_link_advance(lnk->receiver);
             if (lnk->capacity) {
                 pn_link_flow(lnk->receiver, 1);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
 Tue Jan 27 15:00:13 2015
@@ -36,12 +36,10 @@ ReceiverContext::ReceiverContext(pn_sess
     address(a),
     helper(address),
     receiver(pn_receiver(session, name.c_str())),
-    capacity(0), used(0)
-{}
-
+    capacity(0), used(0) {}
 ReceiverContext::~ReceiverContext()
 {
-    //pn_link_free(receiver);
+    pn_link_free(receiver);
 }
 
 void ReceiverContext::setCapacity(uint32_t c)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
 Tue Jan 27 15:00:13 2015
@@ -60,8 +60,6 @@ class ReceiverContext
     void verify();
     Address getAddress() const;
     bool hasCurrent();
-    bool getBrowse() const { return helper.getBrowse(); }
-
   private:
     friend class ConnectionContext;
     const std::string name;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
 Tue Jan 27 15:00:13 2015
@@ -30,6 +30,7 @@
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/log/Statement.h"
+#include "config.h"
 extern "C" {
 #include <proton/engine.h>
 }
@@ -49,7 +50,7 @@ SenderContext::SenderContext(pn_session_
 
 SenderContext::~SenderContext()
 {
-    //pn_link_free(sender);
+    pn_link_free(sender);
 }
 
 void SenderContext::close()
@@ -510,7 +511,11 @@ void SenderContext::Delivery::send(pn_li
 {
     pn_delivery_tag_t tag;
     tag.size = sizeof(id);
+#ifdef NO_PROTON_DELIVERY_TAG_T
+    tag.start = reinterpret_cast<const char*>(&id);
+#else
     tag.bytes = reinterpret_cast<const char*>(&id);
+#endif
     token = pn_delivery(sender, tag);
     pn_link_send(sender, encoded.getData(), encoded.getSize());
     if (unreliable) {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
 Tue Jan 27 15:00:13 2015
@@ -110,10 +110,11 @@ uint32_t SessionContext::getUnsettledAck
     return 0;//TODO
 }
 
-qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, 
bool browse)
+qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
 {
     qpid::framing::SequenceNumber id = next++;
-    if (!browse) unacked[id] = delivery;
+    if (!pn_delivery_settled(delivery))
+        unacked[id] = delivery;
     QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery);
     return id;
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
 Tue Jan 27 15:00:13 2015
@@ -75,7 +75,7 @@ class SessionContext
     qpid::framing::SequenceNumber next;
     std::string name;
 
-    qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse);
+    qpid::framing::SequenceNumber record(pn_delivery_t*);
     void acknowledge();
     void acknowledge(const qpid::framing::SequenceNumber& id, bool 
cummulative);
     void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp 
Tue Jan 27 15:00:13 2015
@@ -150,7 +150,7 @@ void AsynchIOHandler::readbuff(AsynchIO&
                 if (!codec) {
                     //TODO: may still want to revise this...
                     //send valid version header & close connection.
-                    
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
+                    
write(framing::ProtocolInitiation(factory->supportedVersion()));
                     readError = true;
                     aio->queueWriteClose();
                 } else {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h 
(original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h 
Tue Jan 27 15:00:13 2015
@@ -60,6 +60,8 @@ class ConnectionCodec : public Codec {
         virtual ConnectionCodec* create(
             OutputControl&, const std::string& id, const SecuritySettings&
         ) = 0;
+
+        virtual framing::ProtocolVersion supportedVersion() const = 0;
     };
 };
 

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 27 15:00:13 2015
@@ -7,3 +7,4 @@
 /qpid/branches/java-network-refactor/qpid/cpp/src/tests:805429-825319
 /qpid/branches/qpid-2935/qpid/cpp/src/tests:1061302-1072333
 /qpid/branches/qpid-3346/qpid/cpp/src/tests:1144319-1179855
+/qpid/trunk/qpid/cpp/src/tests:1643238-1655056

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py Tue 
Jan 27 15:00:13 2015
@@ -177,3 +177,18 @@ class AssertionTests (VersionTest):
             assert False, "Expected assertion to fail on unspecified option"
         except AssertionFailed: None
         except MessagingError: None
+
+    def test_queue_autodelete_timeout(self):
+        name = str(uuid4())
+        # create subscription queue with 0-10 to be sure of name
+        ssn_0_10 = self.create_connection("amqp0-10", True).session()
+        ssn_0_10.receiver("amq.direct; {link:{name:%s,timeout:30}}" % name)
+        self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: 
{qpid.auto_delete_timeout: 30}}}}" % name)
+        ssn_0_10_other = self.create_connection("amqp0-10", True).session()
+        ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: 
{qpid.auto_delete_timeout: 30}}}}" % name)
+        try:
+            self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: 
{qpid.auto_delete_timeout: 60}}}}" % name)
+            ssn_0_10_other.sender("%s; {assert:always, 
node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name)
+            assert False, "Expected assertion to fail for auto_delete_timeout"
+        except AssertionFailed: None
+        except MessagingError: None

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp 
(original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp Tue 
Jan 27 15:00:13 2015
@@ -40,7 +40,7 @@ namespace tests {
 struct Args : public qpid::TestOptions
 {
     std::string workQueue;
-    size_t workers;
+    uint workers;
 
     Args() : workQueue("txshift-control"), workers(1)
     {
@@ -178,7 +178,7 @@ int main(int argc, char** argv)
             worker.run();
         } else {
             boost::ptr_vector<Worker> workers;
-            for (size_t i = 0; i < opts.workers; i++) {
+            for (uint i = 0; i < opts.workers; i++) {
                 workers.push_back(new Worker(connection, opts.workQueue));
             }
             std::for_each(workers.begin(), workers.end(), 
boost::bind(&Worker::start, _1));

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml
 Tue Jan 27 15:00:13 2015
@@ -378,6 +378,15 @@
             <para>Indicates that broker was shut down due to fatal 
error.</para>
           </entry>
         </row>
+        <row id="Java-Broker-Appendix-Operation-Logging-Message-BRK-1017">
+          <entry morerows="1">BRK-1017</entry>
+          <entry>Process : PID <replaceable>process 
identifier</replaceable></entry>
+        </row>
+        <row>
+          <entry>
+            <para>Process identifier (PID) of the Broker process.</para>
+          </entry>
+        </row>
       </tbody>
     </tgroup>
   </table>

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml
 Tue Jan 27 15:00:13 2015
@@ -43,7 +43,7 @@
 <!ENTITY oracleBdbProductOverviewUrl 
"http://www.oracle.com/technetwork/products/berkeleydb/overview/index-093405.html";>
 <!ENTITY oracleBdbRepGuideUrl 
"http://oracle.com/cd/E17277_02/html/ReplicationGuide/";>
 <!ENTITY oracleBdbJavaDocUrl "http://docs.oracle.com/cd/E17277_02/html/java/";>
-<!ENTITY oracleBdbProductVersion "5.0.97">
+<!ENTITY oracleBdbProductVersion "5.0.104">
 
 <!ENTITY oracleJmxTutorial "http://docs.oracle.com/javase/tutorial/jmx/";>
 

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 27 15:00:13 2015
@@ -9,3 +9,4 @@
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
 /qpid/trunk/qpid:796646-796653
+/qpid/trunk/qpid/java:1643238-1655056

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 Tue Jan 27 15:00:13 2015
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
@@ -32,19 +33,20 @@ import javax.jms.InvalidDestinationExcep
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
+
 import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
 import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
 import org.apache.qpid.amqp_1_0.client.Message;
 import org.apache.qpid.amqp_1_0.client.Receiver;
 import org.apache.qpid.amqp_1_0.client.Transaction;
 import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.MessageConsumerException;
 import org.apache.qpid.amqp_1_0.jms.Queue;
 import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
 import org.apache.qpid.amqp_1_0.jms.Session;
 import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
 import org.apache.qpid.amqp_1_0.jms.Topic;
 import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import org.apache.qpid.amqp_1_0.jms.MessageConsumerException;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
@@ -333,10 +335,23 @@ public class MessageConsumerImpl impleme
             message.setFromTopic(_isTopicSubscriber);
             if(redelivery)
             {
+                UnsignedInteger failures = message.getDeliveryFailures();
+
                 if(!message.getJMSRedelivered())
                 {
                     message.setJMSRedelivered(true);
                 }
+
+                if(failures == null)
+                {
+                    message.setDeliveryFailures(UnsignedInteger.ONE);
+                }
+                else
+                {
+                    
message.setDeliveryFailures(failures.add(UnsignedInteger.ONE));
+                }
+
+
             }
 
             return message;

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1655057&r1=1655056&r2=1655057&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
 Tue Jan 27 15:00:13 2015
@@ -530,6 +530,7 @@ public class Receiver implements Deliver
         {
             release(msg);
         }
+        _session.removeReceiver(this);
 
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to