Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp Tue Jan 27 15:00:13 2015 @@ -73,6 +73,7 @@ class ProtocolImpl : public BrokerContex qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&); boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); + qpid::framing::ProtocolVersion supportedVersion() const; private: }; @@ -158,5 +159,10 @@ boost::shared_ptr<RecoverableMessage> Pr } } +qpid::framing::ProtocolVersion ProtocolImpl::supportedVersion() const +{ + return qpid::framing::ProtocolVersion(1,0); +} + }}} // namespace qpid::broker::amqp
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.cpp Tue Jan 27 15:00:13 2015 @@ -23,6 +23,7 @@ #include "qpid/log/Statement.h" #include <algorithm> #include <string.h> +#include "config.h" namespace qpid { namespace broker { @@ -126,7 +127,13 @@ bool OutgoingFromRelay::doWork() { relay->check(); relay->setCredit(pn_link_credit(link)); - return relay->send(link); + bool worked = relay->send(link); + pn_delivery_t *d = pn_link_current(link); + if (d && pn_delivery_writable(d)) { + handle(d); + return true; + } + return worked; } /** * Called when a delivery is writable @@ -163,7 +170,7 @@ void OutgoingFromRelay::handle(pn_delive /** * Signals that this link has been detached */ -void OutgoingFromRelay::detached() +void OutgoingFromRelay::detached(bool /*closed*/) { relay->detached(this); } @@ -221,7 +228,7 @@ uint32_t IncomingToRelay::getCredit() return relay->getCredit(); } -void IncomingToRelay::detached() +void IncomingToRelay::detached(bool /*closed*/) { relay->detached(this); } @@ -238,7 +245,11 @@ void BufferedTransfer::initIn(pn_link_t* //copy delivery tag pn_delivery_tag_t dt = pn_delivery_tag(d); tag.resize(dt.size); +#ifdef NO_PROTON_DELIVERY_TAG_T + ::memmove(&tag[0], dt.start, dt.size); +#else ::memmove(&tag[0], dt.bytes, dt.size); +#endif //set context pn_delivery_set_context(d, this); @@ -258,7 +269,11 @@ bool BufferedTransfer::settle() void BufferedTransfer::initOut(pn_link_t* link) { pn_delivery_tag_t dt; +#ifdef NO_PROTON_DELIVERY_TAG_T + dt.start = &tag[0]; +#else dt.bytes = &tag[0]; +#endif dt.size = tag.size(); out.handle = pn_delivery(link, dt); //set context Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Relay.h Tue Jan 27 15:00:13 2015 @@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoin const std::string& target, const std::string& name, boost::shared_ptr<Relay>); bool doWork(); void handle(pn_delivery_t* delivery); - void detached(); + void detached(bool closed); void init(); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); @@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming bool settle(); bool doWork(); bool haveWork(); - void detached(); + void detached(bool closed); void readable(pn_delivery_t* delivery); uint32_t getCredit(); private: Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker/amqp/Session.cpp Tue Jan 27 15:00:13 2015 @@ -48,6 +48,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" +#include "config.h" #include <boost/intrusive_ptr.hpp> #include <boost/format.hpp> #include <map> @@ -505,10 +506,11 @@ void Session::setupOutgoing(pn_link_t* l if (!settings.autodelete) settings.autodelete = autodelete; altExchange = node.topic->getAlternateExchange(); } - if (!settings.autoDeleteDelay) { + if (settings.original.find("qpid.auto_delete_timeout") == settings.original.end()) { //only use delay from link if policy didn't specify one settings.autoDeleteDelay = pn_terminus_get_timeout(source); - settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; + if (settings.autoDeleteDelay) + settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; } if (settings.autoDeleteDelay) { settings.autodelete = true; @@ -577,7 +579,7 @@ void Session::detach(pn_link_t* link) if (pn_link_is_sender(link)) { OutgoingLinks::iterator i = outgoing.find(link); if (i != outgoing.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); boost::shared_ptr<Queue> q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get()); if (q && !q->isAutoDelete() && !q->isDeleted()) { connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId()); @@ -588,7 +590,7 @@ void Session::detach(pn_link_t* link) } else { IncomingLinks::iterator i = incoming.find(link); if (i != incoming.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); incoming.erase(i); QPID_LOG(debug, "Incoming link detached"); } @@ -615,7 +617,11 @@ void Session::accepted(pn_delivery_t* de void Session::readable(pn_link_t* link, pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); +#ifdef NO_PROTON_DELIVERY_TAG_T + QPID_LOG(debug, "received delivery: " << std::string(tag.start, tag.size)); +#else QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size)); +#endif incomingMessageReceived(); IncomingLinks::iterator target = incoming.find(link); if (target == incoming.end()) { @@ -653,7 +659,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(s->first); - s->second->detached(); + s->second->detached(true); outgoing.erase(s++); output = true; } @@ -678,7 +684,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(i->first); - i->second->detached(); + i->second->detached(true); incoming.erase(i++); output = true; } @@ -690,10 +696,10 @@ bool Session::dispatch() void Session::close() { for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { - i->second->detached(); + i->second->detached(false); } for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { - i->second->detached(); + i->second->detached(false); } outgoing.clear(); incoming.clear(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Tue Jan 27 15:00:13 2015 @@ -55,6 +55,7 @@ using qpid::messaging::AssertionFailed; using qpid::framing::ExchangeBoundResult; using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; +using qpid::framing::FieldValue; using qpid::framing::QueueQueryResult; using qpid::framing::ReplyTo; using qpid::framing::Uuid; @@ -140,6 +141,11 @@ const std::string PREFIX_AMQ("amq."); const std::string PREFIX_QPID("qpid."); const Verifier verifier; + +bool areEquivalent(const FieldValue& a, const FieldValue& b) +{ + return ((a == b) || (a.convertsTo<int64_t>() && b.convertsTo<int64_t>() && a.get<int64_t>() == b.get<int64_t>())); +} } struct Binding @@ -534,19 +540,19 @@ Subscription::Subscription(const Address reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)), actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), - autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(true)), + autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(!(durable || reliable))), exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)), alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str()) { - const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; - if (timeout) { + + if ((Opt(address)/LINK).hasKey(TIMEOUT)) { + const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32()); - } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) { - //if durable but not explicitly reliable, then set a non-zero - //default for the autodelete timeout (previously this would - //have defaulted to autodelete immediately anyway, so the risk - //of the change causing problems is mitigated) - queueOptions.setInt("qpid.auto_delete_delay", 15*60); + } else if (durable && !reliable && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) { + //if durable but not reliable, and auto-delete not + //explicitly set, then set a non-zero default for the + //autodelete timeout + queueOptions.setInt("qpid.auto_delete_timeout", 2*60); } (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); @@ -609,7 +615,7 @@ void Subscription::subscribe(qpid::clien //create subscription queue: session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, - arg::autoDelete=autoDeleteQueue && (!(durable || reliable)), arg::durable=durable, + arg::autoDelete=autoDeleteQueue, arg::durable=durable, arg::alternateExchange=alternateExchange, arg::arguments=queueOptions); //'default' binding: @@ -806,7 +812,7 @@ void Queue::checkAssert(qpid::client::As FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (!areEquivalent(*i->second, *v)) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } @@ -906,7 +912,7 @@ void Exchange::checkAssert(qpid::client: FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (!areEquivalent(*i->second, *v)) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/framing/SequenceSet.cpp Tue Jan 27 15:00:13 2015 @@ -33,7 +33,18 @@ namespace framing { namespace { //each range contains 2 numbers, 4 bytes each -uint16_t RANGE_SIZE = 2 * 4; +uint16_t RANGE_SIZE = 2 * 4; +int32_t MAX_RANGE = 2147483647;//2^31-1 + +int32_t gap(const SequenceNumber& a, const SequenceNumber& b) +{ + return a < b ? b - a : a - b; +} + +bool is_max_range(const SequenceNumber& a, const SequenceNumber& b) +{ + return gap(a, b) == MAX_RANGE; +} } void SequenceSet::encode(Buffer& buffer) const @@ -54,7 +65,21 @@ void SequenceSet::decode(Buffer& buffer) throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size)); for (uint16_t i = 0; i < count; i++) { - add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong())); + SequenceNumber a(buffer.getLong()); + SequenceNumber b(buffer.getLong()); + if (b < a) + throw IllegalArgumentException(QPID_MSG("Invalid range in sequence set: " << a << " -> " << b)); + if (is_max_range(a, b)) { + //RangeSet holds 'half-closed' ranges, where the end is + //one past the 'highest' value in the range. So if the + //range is already the maximum expressable with a 32bit + //sequence number, we can't represent it as a + //'half-closed' range, so we represent it as two ranges. + add(a, b-1); + add(b); + } else { + add(a, b); + } } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/ha/Primary.cpp Tue Jan 27 15:00:13 2015 @@ -482,6 +482,7 @@ shared_ptr<PrimaryTxObserver> Primary::m { shared_ptr<PrimaryTxObserver> observer = PrimaryTxObserver::create(*this, haBroker, txBuffer); + sys::Mutex::ScopedLock l(lock); txMap[observer->getTxQueue()->getName()] = observer; return observer; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/ISSUES Tue Jan 27 15:00:13 2015 @@ -25,7 +25,7 @@ Current/pending: ------ ------- ---------------------- 5359 - Linearstore: Implement new management schema and wire into store 5360 - Linearstore: Evaluate and rework logging to produce a consistent log output - 5361 1145359 Linearstore: No tests for linearstore functionality currently exist +* 5361 1145359 Linearstore: No tests for linearstore functionality currently exist svn r.1564893 2014-02-05: Added tx-test-soak.sh svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore @@ -37,15 +37,22 @@ Current/pending: ** Basic performance tests 5464 - [linearstore] Incompletely created journal files accumulate in EFP - 1088944 [Linearstore] store does not return all files to EFP after purging big queue <queue purge issue> - 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation +* - 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation - 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP - 1067429 [LinearStore] last file from deleted queue is not moved to EFP <queue delete issue> - - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP -* 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs - svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks - svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new - 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal - svn r.1643053 2014-11-18: Proposed fix + - 1067482 [LinearStore] Provide a way to preallocate empty pages in EFP +* 6303 1180660 [linearstore] Roll back auto-upgrade of store directory structure +* 5362 1145363 Linearstore: No store tools exist for examining the journals + svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. + svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze + svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze + svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze + svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze + svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze + * Store analysis and status + * Recovery/reading of message content + * Empty file pool status and management + @@ -115,16 +122,6 @@ NO-JIRA - Added missing Apache co 5651 - [C++ broker] segfault in qpid::linearstore::journal::jdir::clear_dir when declaring durable queue svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec * Bug introduced by r.1578899. - 5362 1145363 Linearstore: No store tools exist for examining the journals - svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. - svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze - svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze - svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze - svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze - svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze - * Store analysis and status - * Recovery/reading of message content - * Empty file pool status and management 5661 - [linearstore] Set default cmake build to exclude linearstore svn r.1584379 2014-04-03 Proposed solution. * Run ccmake, select BUILD_LINEARSTORE to change its value to ON to build. @@ -147,8 +144,16 @@ NO-JIRA - Added missing Apache co svn r.1631360 2014-10-13 Proposed solution 6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP svn r.1632504 2014-10-17 Proposed solution by pmoravec - 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute + 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal + svn r.1643053 2014-11-18: Proposed fix + 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute svn r.1641689 2014-11-25 Proposed solution + 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs + svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks + svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new + svn r.1649081 2015-01-02: WIP: Specify new queue using qpid-config --durable together with --efp-partition-num and/or --efp-pool-file-size. Needs testing. + - 1148807 [linearstore] Restarting broker with empty journal raises confusing warning + Fixed by svn r.1649081 of bug 5671 / 1160367 above Ordered checkin list: @@ -187,14 +192,17 @@ no. svn r Q-JIRA RHBZ Date 28. 1596509 5767 1098118 2014-05-21 0.22-mrg (pmoravec) 29. 1596633 NO-JIRA 1078937 2014-05-21 (includes tools install update) 30. 1599243 5767 1098118 2014-06-02 0.22-mrg -30. 1599243 5767 1098118 2014-06-02 -31. 1614665 5924 1124906 2014-07-30 -32. 1620426 6043 1089652 2014-08-25 -33. 1631360 6147 1152012 2014-10-13 (pmoravec) -34. 1632504 6157 1150397 2014-10-17 (pmoravec) -35. 1636598 5671 2014-11-04 -36. 1637985 5671 2014-11-10 -37. 1641689 6248 1167911 2014-11-25 +31. 1599243 5767 1098118 2014-06-02 +32. 1614665 5924 1124906 2014-07-30 +33. 1620426 6043 1089652 2014-08-25 +34. 1631360 6147 1152012 2014-10-13 (pmoravec) +35. 1632504 6157 1150397 2014-10-17 (pmoravec) +36. 1636598 5671 1160367 2014-11-04 +37. 1637985 5671 1160367 2014-11-10 +38. 1643053 6230 1165200 2014-11-18 +39. 1641689 6248 1167911 2014-11-25 +40. 1649081 5671 1160367 2015-01-02 +41. 1649082 NO-JIRA - 2015-01-02 See above sections for details on these checkins. Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Tue Jan 27 15:00:13 2015 @@ -148,7 +148,7 @@ void MessageStoreImpl::initManagement () mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr ( new qmf::org::apache::qpid::linearstore::Store(agent, this, broker)); - mgmtObject->set_location(storeDir); + mgmtObject->set_storeDir(storeDir); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES); @@ -406,7 +406,7 @@ void MessageStoreImpl::create(qpid::brok if (queue_.getName().size() == 0) { - QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); + QLS_LOG(error, "Cannot create store for empty (null) queue name - queue create ignored."); return; } @@ -449,15 +449,15 @@ qpid::linearstore::journal::EmptyFilePoo MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { qpid::framing::FieldTable::ValuePtr value; qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; - value = args_.get("qpid.efp_partition"); + value = args_.get("qpid.efp_partition_num"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition"); + localEfpPartition = chkEfpPartition((uint32_t)value->get<int>(), "qpid.efp_partition_num"); } qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; - value = args_.get("qpid.efp_file_size"); + value = args_.get("qpid.efp_pool_file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo<int>()) { - localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(),"qpid.efp_file_size" ); + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get<int>(), "qpid.efp_pool_file_size"); } return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib); } @@ -1488,21 +1488,21 @@ std::string MessageStoreImpl::getStoreTo std::string MessageStoreImpl::getJrnlBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/jrnl/" ; + dir << storeDir << "/" << storeTopLevelDir << "/jrnl2/" ; return dir.str(); } std::string MessageStoreImpl::getBdbBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/dat/" ; + dir << storeDir << "/" << storeTopLevelDir << "/dat2/" ; return dir.str(); } std::string MessageStoreImpl::getTplBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/tpl/" ; + dir << storeDir << "/" << storeTopLevelDir << "/tpl2/" ; return dir.str(); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp Tue Jan 27 15:00:13 2015 @@ -59,10 +59,16 @@ EmptyFilePool::EmptyFilePool(const std:: EmptyFilePool::~EmptyFilePool() {} void EmptyFilePool::initialize() { -//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG - std::vector<std::string> dirList; + if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { // Create EFP dir if it does not yet exist + if (errno != EEXIST) { + std::ostringstream oss; + oss << "directory=" << efpDirectory_ << " " << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), "EmptyFilePool", "initialize"); + } + } // Process empty files in main dir + std::vector<std::string> dirList; jdir::read_dir(efpDirectory_, dirList, false, true, false, false); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { size_t dotPos = i->rfind("."); @@ -122,14 +128,14 @@ const efpIdentity_t EmptyFilePool::getId std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { std::string emptyFileName = popEmptyFile(); - std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); + std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { // Try again with new UUID for file name newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName(); - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { //std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG - pushEmptyFile(emptyFileName); + pushEmptyFile(emptyFileName); // Return empty file to pool std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile"); @@ -138,7 +144,7 @@ std::string EmptyFilePool::takeEmptyFile if (createSymLink(newFileName, symlinkName)) { std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); } return symlinkName; } @@ -189,12 +195,27 @@ efpDataSize_kib_t EmptyFilePool::dataSiz } // --- protected functions --- +void EmptyFilePool::checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName) { + if (!ofs.good()) { + if (ofs.is_open()) { + ofs.close(); + } + std::ostringstream oss; + oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << (ofs.fail()?"T":"F") << " badbit=" + << (ofs.bad()?"T":"F") << " file=" << fqFileName << " operation=" << operation << ": " << errorMessage; + throw jexception(jerrno, oss.str(), className, fnName); + } +} std::string EmptyFilePool::createEmptyFile() { std::string efpfn = getEfpFileName(); - if (!overwriteFileContents(efpfn)) { - // TODO: handle failure to prepare new file here - } + overwriteFileContents(efpfn); return efpfn; } @@ -226,24 +247,20 @@ void EmptyFilePool::initializeSubDirecto } } -bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { +void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { ::file_hdr_t fh; ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_); std::ofstream ofs(fqFileName.c_str(), std::ofstream::out | std::ofstream::binary); - if (ofs.good()) { - ofs.write((char*)&fh, sizeof(::file_hdr_t)); - uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); - while (rem--) - ofs.put('\0'); - ofs.close(); - return true; -//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG - } else { - std::ostringstream oss; - oss << "std::ofstream ofs: file=\"" << fqFileName.c_str() << "\"" << " failed to be open"; - throw jexception(jerrno::JERR_EFP_FOPEN, oss.str(), "EmptyFilePool", "overwriteFileContents"); + checkIosState(ofs, jerrno::JERR_EFP_FOPEN, fqFileName, "constructor", "Failed to create file", "EmptyFilePool", "overwriteFileContents"); + ofs.write((char*)&fh, sizeof(::file_hdr_t)); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "write()", "Failed to write header", "EmptyFilePool", "overwriteFileContents"); + uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); + while (rem--) { + ofs.put('\0'); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "put()", "Failed to put \0", "EmptyFilePool", "overwriteFileContents"); } - return false; + ofs.close(); +//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG } std::string EmptyFilePool::popEmptyFile() { @@ -271,7 +288,7 @@ void EmptyFilePool::pushEmptyFile(const void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) { std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, returnedFileName)) { + if (!moveFile(emptyFileName, returnedFileName)) { ::unlink(emptyFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG } @@ -283,7 +300,7 @@ void EmptyFilePool::returnEmptyFile(cons overwriteFileContents(returnedFileName); } std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(returnedFileName, sanitizedEmptyFileName)) { + if (!moveFile(returnedFileName, sanitizedEmptyFileName)) { ::unlink(returnedFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG } else { @@ -395,18 +412,6 @@ bool EmptyFilePool::validateEmptyFile(co return true; } -// static -int EmptyFilePool::moveFile(const std::string& from, - const std::string& to) { - if (::rename(from.c_str(), to.c_str())) { - if (errno == EEXIST) return errno; // File name exists - std::ostringstream oss; - oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); - } - return 0; -} - //static int EmptyFilePool::createSymLink(const std::string& fqFileName, const std::string& fqLinkName) { @@ -414,7 +419,7 @@ int EmptyFilePool::createSymLink(const s if (errno == EEXIST) return errno; // File name exists std::ostringstream oss; oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); } return 0; } @@ -426,7 +431,7 @@ std::string EmptyFilePool::deleteSymlink if (len < 0) { std::ostringstream oss; oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); } ::unlink(fqLinkName.c_str()); return std::string(buff, len); @@ -455,4 +460,18 @@ bool EmptyFilePool::isSymlink(const std: } +// static +bool EmptyFilePool::moveFile(const std::string& from, + const std::string& to) { + if (::rename(from.c_str(), to.c_str())) { + if (errno == EEXIST) { + return false; // File name exists + } + std::ostringstream oss; + oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); + } + return true; +} + }}} Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h Tue Jan 27 15:00:13 2015 @@ -87,23 +87,30 @@ public: const efpPartitionNumber_t partitionNumber); protected: + void checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName); std::string createEmptyFile(); std::string getEfpFileName(); void initializeSubDirectory(const std::string& fqDirName); - bool overwriteFileContents(const std::string& fqFileName); + void overwriteFileContents(const std::string& fqFileName); std::string popEmptyFile(); void pushEmptyFile(const std::string fqFileName); void returnEmptyFile(const std::string& emptyFileName); void resetEmptyFileHeader(const std::string& fqFileName); bool validateEmptyFile(const std::string& emptyFileName) const; - static int moveFile(const std::string& fromFqPath, - const std::string& toFqPath); static int createSymLink(const std::string& fqFileName, const std::string& fqLinkName); static std::string deleteSymlink(const std::string& fqLinkName); static bool isFile(const std::string& fqName); static bool isSymlink(const std::string& fqName); + static bool moveFile(const std::string& fromFqPath, + const std::string& toFqPath); }; }}} Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp Tue Jan 27 15:00:13 2015 @@ -74,6 +74,7 @@ void EmptyFilePoolManager::findEfpPartit if (!foundPartition) { std::ostringstream oss1; oss1 << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_) + << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_); jdir::create_dir(oss1.str()); insertPartition(defaultPartitionNumber_, oss1.str()); @@ -165,9 +166,10 @@ EmptyFilePool* EmptyFilePoolManager::get EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpDataSize_kib) { EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ? partitionNumber : defaultPartitionNumber_); - if (efppp != 0) - return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_); - return 0; + if (efppp == 0) { + return 0; + } + return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_, true); } void EmptyFilePoolManager::getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList, Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp Tue Jan 27 15:00:13 2015 @@ -32,6 +32,9 @@ namespace qpid { namespace linearstore { namespace journal { +// static +const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition + EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum, const std::string& partitionDir, const bool overwriteBeforeReturnFlag, @@ -57,72 +60,31 @@ EmptyFilePoolPartition::~EmptyFilePoolPa void EmptyFilePoolPartition::findEmptyFilePools() { //std::cout << "*** EmptyFilePoolPartition::findEmptyFilePools(): Reading " << partitionDir_ << std::endl; // DEBUG - std::vector<std::string> dirList; - bool upgradeDirStructureFlag = false; - std::string oldPartitionDir; - jdir::read_dir(partitionDir_, dirList, true, false, false, false); -//std::cout << "*** dirList.size()=" << dirList.size() << "; dirList.front()=" << dirList.front() << std::endl; // DEBUG - if (dirList.size() == 1 && dirList.front().compare("efp") == 0) { - upgradeDirStructureFlag = true; - oldPartitionDir = partitionDir_ + "/efp"; -//std::cout << "*** oldPartitionDir=" << oldPartitionDir << std::endl; // DEBUG - dirList.clear(); - jdir::read_dir(oldPartitionDir, dirList, true, false, false, false); - } + std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_); + if (jdir::is_dir(efpDir)) { + std::vector<std::string> dirList; + jdir::read_dir(efpDir, dirList, true, false, false, true); for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) { - std::string fqFileName(partitionDir_ + "/" + *i); - if (upgradeDirStructureFlag) { - std::string fqOldFileName(partitionDir_ + "/efp/" + *i); - if (::rename(fqOldFileName.c_str(), fqFileName.c_str())) { - // File move failed - std::ostringstream oss; - oss << "File \'" << fqOldFileName << "\' could not be renamed to \'" << fqFileName << "\' (" << FORMAT_SYSERR(errno) << "); file deleted"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - if (::unlink(fqOldFileName.c_str())) { - std::ostringstream oss; - oss << "File \'" << fqOldFileName << "\' could not be deleted (" << FORMAT_SYSERR(errno) << "\'; file orphaned"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } - } - } - EmptyFilePool* efpp = 0; - try { - efpp = new EmptyFilePool(fqFileName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); - { - slock l(efpMapMutex_); - efpMap_[efpp->dataSize_kib()] = efpp; - } - } - catch (const std::exception& e) { - if (efpp != 0) { - delete efpp; - efpp = 0; - } - std::ostringstream oss; - oss << "EmptyFilePool create failed: " << e.what(); - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } - if (efpp != 0) { - efpp->initialize(); - } - } - if (upgradeDirStructureFlag) { - std::string oldEfpDir(partitionDir_ + "/efp"); - if (::rmdir(oldEfpDir.c_str())) { - // Unable to delete old "efp" dir - std::ostringstream oss; - oss << "Unable to delete old EFP directory \'" << oldEfpDir << "\' (" << FORMAT_SYSERR(errno) << "\'; directory orphaned"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } + createEmptyFilePool(*i); } + } else { + std::ostringstream oss; + oss << "Partition \"" << partitionDir_ << "\" does not contain top level EFP dir \"" << s_efpTopLevelDir_ << "\""; + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } } -EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { - slock l(efpMapMutex_); - efpMapItr_t i = efpMap_.find(efpDataSize_kib); - if (i == efpMap_.end()) - return 0; - return i->second; +EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) { + { + slock l(efpMapMutex_); + efpMapItr_t i = efpMap_.find(efpDataSize_kib); + if (i != efpMap_.end()) + return i->second; + } + if (createIfNonExistent) { + return createEmptyFilePool(efpDataSize_kib); + } + return 0; } void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) { @@ -183,7 +145,7 @@ std::string EmptyFilePoolPartition::getP //static efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) { if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) { - long pn = ::strtol(name.c_str() + 1, 0, 0); + long pn = ::strtol(name.c_str() + 1, 0, 10); if (pn == 0 && errno) { return 0; } else { @@ -195,12 +157,42 @@ efpPartitionNumber_t EmptyFilePoolPartit // --- protected functions --- +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { + std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePoolPartition::s_efpTopLevelDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib)); + return createEmptyFilePool(fqEfpDirectoryName); +} + +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqEfpDirectoryName) { + EmptyFilePool* efpp = 0; + try { + efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); + { + slock l(efpMapMutex_); + efpMap_[efpp->dataSize_kib()] = efpp; + } + } + catch (const std::exception& e) { + if (efpp != 0) { + delete efpp; + efpp = 0; + } + std::ostringstream oss; + oss << "EmptyFilePool create failed: " << e.what(); + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } + if (efpp != 0) { + efpp->initialize(); + } + return efpp; +} + void EmptyFilePoolPartition::validatePartitionDir() { + std::ostringstream ss; if (!jdir::is_dir(partitionDir_)) { - std::ostringstream ss; ss << "Invalid partition directory: \'" << partitionDir_ << "\' is not a directory"; throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir"); } + // TODO: other validity checks here } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h Tue Jan 27 15:00:13 2015 @@ -37,6 +37,8 @@ class JournalLog; class EmptyFilePoolPartition { +public: + static const std::string s_efpTopLevelDir_; protected: typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t; typedef efpMap_t::iterator efpMapItr_t; @@ -59,7 +61,7 @@ public: virtual ~EmptyFilePoolPartition(); void findEmptyFilePools(); - EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent); void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList); void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const; std::string getPartitionDirectory() const; @@ -70,6 +72,8 @@ public: static efpPartitionNumber_t getPartitionNumber(const std::string& name); protected: + EmptyFilePool* createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* createEmptyFilePool(const std::string fqEfpDirectoryName); void validatePartitionDir(); }; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Tue Jan 27 15:00:13 2015 @@ -43,6 +43,7 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include <sstream> #include <string> +#include <unistd.h> #include <vector> namespace qpid { @@ -101,7 +102,11 @@ void RecoveryManager::analyzeJournals(co analyzeJournalFileHeaders(efpIdentity); if (journalEmptyFlag_) { - *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + if (uninitFileList_.empty()) { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + } else { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); + } } else { *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); if (! *emptyFilePoolPtrPtr) { @@ -294,6 +299,7 @@ void RecoveryManager::setLinearFileContr LinearFileController* lfcPtr) { if (journalEmptyFlag_) { if (uninitFileList_.size() > 0) { + // TODO: Handle case if uninitFileList_.size() > 1, but this should not happen in normal operation. Here we assume only one item in the list. std::string uninitFile = uninitFileList_.back(); uninitFileList_.pop_back(); lfcPtr->restoreEmptyFile(uninitFile); @@ -377,11 +383,28 @@ void RecoveryManager::analyzeJournalFile jdir::read_dir(journalDirectory_, directoryList, false, true, false, true); for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { bool hdrOk = readJournalFileHeader(*i, fileHeader, headerQueueName); - if (!hdrOk || headerQueueName.empty()) { + bool hdrEmpty = ::is_file_hdr_reset(&fileHeader); + if (!hdrOk) { std::ostringstream oss; - oss << "Journal file " << (*i) << " is uninitialized or corrupted"; + oss << "Journal file " << (*i) << " is corrupted or invalid"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); + } else if (hdrEmpty) { + // Read symlink, find efp directory name which is efp size in KiB + // TODO: place this bit into a common function as it is also used in EmptyFilePool.cpp::deleteSymlink() + char buff[1024]; + ssize_t len = ::readlink((*i).c_str(), buff, 1024); + if (len < 0) { + std::ostringstream oss; + oss << "symlink=\"" << (*i) << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "RecoveryManager", "analyzeJournalFileHeaders"); + } + // Find second and third '/' from back of string, which contains the EFP directory name + *(::strrchr(buff, '/')) = '\0'; + *(::strrchr(buff, '/')) = '\0'; + int efpDataSize_kib = atoi(::strrchr(buff, '/') + 1); uninitFileList_.push_back(*i); + efpIdentity.pn_ = fileHeader._efp_partition; + efpIdentity.ds_ = efpDataSize_kib; } else if (headerQueueName.compare(queueName_) != 0) { std::ostringstream oss; oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; @@ -406,6 +429,7 @@ void RecoveryManager::analyzeJournalFile } } +//std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { @@ -905,7 +929,9 @@ bool RecoveryManager::readJournalFileHea } ifs.close(); ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t)); - if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) return false; + if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) { + return false; + } queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len); return true; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp Tue Jan 27 15:00:13 2015 @@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__UNEXPRESPON const uint32_t jerrno::JERR__RECNFOUND = 0x0109; const uint32_t jerrno::JERR__NOTIMPL = 0x010a; const uint32_t jerrno::JERR__NULL = 0x010b; +const uint32_t jerrno::JERR__SYMLINK = 0x010c; // class jcntl const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200; @@ -112,10 +113,11 @@ const uint32_t jerrno::JERR_EFP_BADPARTI const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03; const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04; const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05; -const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06; -const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d07; -const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d08; -const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d09; +const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d06; +const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d07; +const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d08; +const uint32_t jerrno::JERR_EFP_FWRITE = 0x0d09; +const uint32_t jerrno::JERR_EFP_MKDIR = 0x0d0a; // Negative returns for some functions const int32_t jerrno::AIO_TIMEOUT = -1; @@ -140,6 +142,7 @@ jerrno::__init() _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found."; _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented"; _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer"; + _err_map[JERR__SYMLINK] = "JERR__SYMLINK: Symbolic link operation failed"; // class jcntl _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal."; @@ -210,10 +213,11 @@ jerrno::__init() _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory"; _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size"; _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty"; - _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed"; _err_map[JERR_EFP_LSTAT] = "JERR_EFP_LSTAT: lstat() operation failed"; _err_map[JERR_EFP_BADFILETYPE] = "JERR_EFP_BADFILETYPE: File type incorrect for operation"; _err_map[JERR_EFP_FOPEN] = "JERR_EFP_FOPEN: Unable to fopen file for write"; + _err_map[JERR_EFP_FWRITE] = "JERR_EFP_FWRITE: Write failed"; + _err_map[JERR_EFP_MKDIR] = "JERR_EFP_MKDIR: Directory creation failed"; //_err_map[] = ""; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/journal/jerrno.h Tue Jan 27 15:00:13 2015 @@ -60,6 +60,7 @@ namespace journal { static const uint32_t JERR__RECNFOUND; ///< Record not found static const uint32_t JERR__NOTIMPL; ///< Not implemented static const uint32_t JERR__NULL; ///< Operation on null pointer + static const uint32_t JERR__SYMLINK; ///< Symbolic Link operation failed // class jcntl static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal @@ -130,10 +131,11 @@ namespace journal { static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size static const uint32_t JERR_EFP_EMPTY; ///< EFP empty - static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed static const uint32_t JERR_EFP_LSTAT; ///< lstat operation failed static const uint32_t JERR_EFP_BADFILETYPE; ///< Bad file type static const uint32_t JERR_EFP_FOPEN; ///< Unable to fopen file for write + static const uint32_t JERR_EFP_FWRITE; ///< Write failed + static const uint32_t JERR_EFP_MKDIR; ///< Directory creation failed // Negative returns for some functions static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/linearstore/management-schema.xml Tue Jan 27 15:00:13 2015 @@ -21,38 +21,24 @@ <class name="Store"> <property name="brokerRef" type="objId" access="RO" references="qpid.Broker" index="y" parentRef="y"/> - <property name="location" type="sstr" access="RO" desc="Logical directory on disk"/> - <!--property name="defaultInitialFileCount" type="uint16" access="RO" unit="file" desc="Default number of files initially allocated to each journal"/--> - <!--property name="defaultDataFileSize" type="uint32" access="RO" unit="RdPg" desc="Default size of each journal data file"/--> + <property name="storeDir" type="sstr" access="RO" desc="Logical directory on disk"/> <property name="tplIsInitialized" type="bool" access="RO" desc="Transaction prepared list has been initialized by a transactional prepare"/> <property name="tplDirectory" type="sstr" access="RO" desc="Transaction prepared list directory"/> <property name="tplWritePageSize" type="uint32" access="RO" unit="byte" desc="Page size in transaction prepared list write-page-cache"/> <property name="tplWritePages" type="uint32" access="RO" unit="wpage" desc="Number of pages in transaction prepared list write-page-cache"/> - <!--property name="tplInitialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to transaction prepared list journal"/--> - <!--property name="tplDataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file in transaction prepared list journal"/--> - <!--property name="tplCurrentFileCount" type="uint32" access="RO" unit="file" desc="Number of files currently allocated to transaction prepared list journal"/--> <statistic name="tplTransactionDepth" type="hilo32" unit="txn" desc="Number of currently enqueued prepared transactions"/> <statistic name="tplTxnPrepares" type="count64" unit="record" desc="Total transaction prepares on transaction prepared list"/> <statistic name="tplTxnCommits" type="count64" unit="record" desc="Total transaction commits on transaction prepared list"/> <statistic name="tplTxnAborts" type="count64" unit="record" desc="Total transaction aborts on transaction prepared list"/> - <statistic name="tplOutstandingAIOs" type="hilo32" unit="aio_op" desc="Deprecated"/> </class> <class name="Journal"> <property name="queueRef" type="objId" access="RO" references="qpid.Queue" isGeneralReference="y"/> - <property name="name" type="sstr" access="RC" index="y"/> + <property name="queueName" type="sstr" access="RC" index="y"/> <property name="directory" type="sstr" access="RO" desc="Directory containing journal files"/> - <property name="baseFileName" type="sstr" access="RO" desc="Deprecated"/> <property name="writePageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/> <property name="writePages" type="uint32" access="RO" unit="wpage" desc="Deprecated"/> - <property name="readPageSize" type="uint32" access="RO" unit="byte" desc="Deprecated"/> - <property name="readPages" type="uint32" access="RO" unit="rpage" desc="Deprecated"/> - <!--property name="initialFileCount" type="uint16" access="RO" unit="file" desc="Number of files initially allocated to this journal"/--> - <!--property name="autoExpand" type="bool" access="RO" desc="Auto-expand enabled"/--> - <!--property name="currentFileCount" type="uint16" access="RO" unit="file" desc="Number of files currently allocated to this journal"/--> - <!--property name="maxFileCount" type="uint16" access="RO" unit="file" desc="Max number of files allowed for this journal"/--> - <!--property name="dataFileSize" type="uint32" access="RO" unit="byte" desc="Size of each journal data file"/--> <statistic name="recordDepth" type="hilo32" unit="record" desc="Number of currently enqueued records (durable messages)"/> <statistic name="enqueues" type="count64" unit="record" desc="Total enqueued records on journal"/> @@ -64,36 +50,5 @@ <statistic name="txnAborts" type="count64" unit="record" desc="Total transactional abort records on journal"/> <statistic name="outstandingAIOs" type="hilo32" unit="aio_op" desc="Number of currently outstanding AIO requests in Async IO system"/> -<!-- - The following are not yet "wired up" in JournalImpl.cpp ---> - <statistic name="freeFileCount" type="hilo32" unit="file" desc="Deprecated"/> - <statistic name="availableFileCount" type="hilo32" unit="file" desc="Deprecated"/> - <statistic name="writeWaitFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="writeBusyFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="readRecordCount" type="count64" unit="record" desc="Deprecated"/> - <statistic name="readBusyFailures" type="count64" unit="record" desc="Deprecated"/> - <statistic name="writePageCacheDepth" type="hilo32" unit="wpage" desc="Deprecated"/> - <statistic name="readPageCacheDepth" type="hilo32" unit="rpage" desc="Deprecated"/> - - <!--method name="expand" desc="Increase number of files allocated for this journal"> - <arg name="by" type="uint32" dir="I" desc="Number of files to increase journal size by"/> - </method--> </class> - - <eventArguments> - <!--arg name="autoExpand" type="bool" desc="Journal auto-expand enabled"/--> - <arg name="fileSize" type="uint32" desc="Journal file size in bytes"/> - <arg name="jrnlId" type="sstr" desc="Journal Id"/> - <arg name="numEnq" type="uint32" desc="Number of recovered enqueues"/> - <arg name="numFiles" type="uint16" desc="Number of journal files"/> - <arg name="numTxn" type="uint32" desc="Number of recovered transactions"/> - <arg name="numTxnDeq" type="uint32" desc="Number of recovered transactional dequeues"/> - <arg name="numTxnEnq" type="uint32" desc="Number of recovered transactional enqueues"/> - <arg name="what" type="sstr" desc="Description of event"/> - </eventArguments> - <event name="enqThresholdExceeded" sev="warn" args="jrnlId, what"/> - <event name="created" sev="notice" args="jrnlId, fileSize, numFiles"/> - <event name="full" sev="error" args="jrnlId, what"/> - <event name="recovered" sev="notice" args="jrnlId, fileSize, numFiles, numEnq, numTxn, numTxnEnq, numTxnDeq"/> </schema> Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Tue Jan 27 15:00:13 2015 @@ -242,7 +242,7 @@ bool replace(Variant::Map& map, const st } } -const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes +const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes const uint32_t DEFAULT_TIMEOUT(0); } @@ -267,7 +267,7 @@ AddressHelper::AddressHelper(const Addre bind(link, RELIABILITY, reliability); durableNode = test(node, DURABLE); durableLink = test(link, DURABLE); - timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); + timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); std::string mode; if (bind(address, MODE, mode)) { if (mode == BROWSE) { @@ -571,7 +571,8 @@ bool AddressHelper::enabled(const std::s bool AddressHelper::isUnreliable() const { - return reliability == AT_MOST_ONCE || reliability == UNRELIABLE; + return reliability == AT_MOST_ONCE || reliability == UNRELIABLE || + (reliability.empty() && browse); // A browser defaults to unreliable. } const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Tue Jan 27 15:00:13 2015 @@ -44,7 +44,6 @@ class AddressHelper const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; bool getLinkTarget(std::string& out) const; - bool getBrowse() const { return browse; } const qpid::types::Variant::Map& getLinkProperties() const; static std::string getLinkName(const Address& address); private: Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Tue Jan 27 15:00:13 2015 @@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::share QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); encoded->init(impl); impl.setEncoded(encoded); - impl.setInternalId(ssn->record(current, lnk->getBrowse())); + impl.setInternalId(ssn->record(current)); pn_link_advance(lnk->receiver); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Tue Jan 27 15:00:13 2015 @@ -36,12 +36,10 @@ ReceiverContext::ReceiverContext(pn_sess address(a), helper(address), receiver(pn_receiver(session, name.c_str())), - capacity(0), used(0) -{} - + capacity(0), used(0) {} ReceiverContext::~ReceiverContext() { - //pn_link_free(receiver); + pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Tue Jan 27 15:00:13 2015 @@ -60,8 +60,6 @@ class ReceiverContext void verify(); Address getAddress() const; bool hasCurrent(); - bool getBrowse() const { return helper.getBrowse(); } - private: friend class ConnectionContext; const std::string name; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Tue Jan 27 15:00:13 2015 @@ -30,6 +30,7 @@ #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/log/Statement.h" +#include "config.h" extern "C" { #include <proton/engine.h> } @@ -49,7 +50,7 @@ SenderContext::SenderContext(pn_session_ SenderContext::~SenderContext() { - //pn_link_free(sender); + pn_link_free(sender); } void SenderContext::close() @@ -510,7 +511,11 @@ void SenderContext::Delivery::send(pn_li { pn_delivery_tag_t tag; tag.size = sizeof(id); +#ifdef NO_PROTON_DELIVERY_TAG_T + tag.start = reinterpret_cast<const char*>(&id); +#else tag.bytes = reinterpret_cast<const char*>(&id); +#endif token = pn_delivery(sender, tag); pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Tue Jan 27 15:00:13 2015 @@ -110,10 +110,11 @@ uint32_t SessionContext::getUnsettledAck return 0;//TODO } -qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse) +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { qpid::framing::SequenceNumber id = next++; - if (!browse) unacked[id] = delivery; + if (!pn_delivery_settled(delivery)) + unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Tue Jan 27 15:00:13 2015 @@ -75,7 +75,7 @@ class SessionContext qpid::framing::SequenceNumber next; std::string name; - qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse); + qpid::framing::SequenceNumber record(pn_delivery_t*); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Jan 27 15:00:13 2015 @@ -150,7 +150,7 @@ void AsynchIOHandler::readbuff(AsynchIO& if (!codec) { //TODO: may still want to revise this... //send valid version header & close connection. - write(framing::ProtocolInitiation(framing::highestProtocolVersion)); + write(framing::ProtocolInitiation(factory->supportedVersion())); readError = true; aio->queueWriteClose(); } else { Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/ConnectionCodec.h Tue Jan 27 15:00:13 2015 @@ -60,6 +60,8 @@ class ConnectionCodec : public Codec { virtual ConnectionCodec* create( OutputControl&, const std::string& id, const SecuritySettings& ) = 0; + + virtual framing::ProtocolVersion supportedVersion() const = 0; }; }; Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jan 27 15:00:13 2015 @@ -7,3 +7,4 @@ /qpid/branches/java-network-refactor/qpid/cpp/src/tests:805429-825319 /qpid/branches/qpid-2935/qpid/cpp/src/tests:1061302-1072333 /qpid/branches/qpid-3346/qpid/cpp/src/tests:1144319-1179855 +/qpid/trunk/qpid/cpp/src/tests:1643238-1655056 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/assertions.py Tue Jan 27 15:00:13 2015 @@ -177,3 +177,18 @@ class AssertionTests (VersionTest): assert False, "Expected assertion to fail on unspecified option" except AssertionFailed: None except MessagingError: None + + def test_queue_autodelete_timeout(self): + name = str(uuid4()) + # create subscription queue with 0-10 to be sure of name + ssn_0_10 = self.create_connection("amqp0-10", True).session() + ssn_0_10.receiver("amq.direct; {link:{name:%s,timeout:30}}" % name) + self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name) + ssn_0_10_other = self.create_connection("amqp0-10", True).session() + ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name) + try: + self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name) + ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name) + assert False, "Expected assertion to fail for auto_delete_timeout" + except AssertionFailed: None + except MessagingError: None Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/txshift.cpp Tue Jan 27 15:00:13 2015 @@ -40,7 +40,7 @@ namespace tests { struct Args : public qpid::TestOptions { std::string workQueue; - size_t workers; + uint workers; Args() : workQueue("txshift-control"), workers(1) { @@ -178,7 +178,7 @@ int main(int argc, char** argv) worker.run(); } else { boost::ptr_vector<Worker> workers; - for (size_t i = 0; i < opts.workers; i++) { + for (uint i = 0; i < opts.workers; i++) { workers.push_back(new Worker(connection, opts.workQueue)); } std::for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1)); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/Java-Broker-Appendix-Operational-Logging-Messages.xml Tue Jan 27 15:00:13 2015 @@ -378,6 +378,15 @@ <para>Indicates that broker was shut down due to fatal error.</para> </entry> </row> + <row id="Java-Broker-Appendix-Operation-Logging-Message-BRK-1017"> + <entry morerows="1">BRK-1017</entry> + <entry>Process : PID <replaceable>process identifier</replaceable></entry> + </row> + <row> + <entry> + <para>Process identifier (PID) of the Broker process.</para> + </entry> + </row> </tbody> </tgroup> </table> Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/doc/book/src/java-broker/commonEntities.xml Tue Jan 27 15:00:13 2015 @@ -43,7 +43,7 @@ <!ENTITY oracleBdbProductOverviewUrl "http://www.oracle.com/technetwork/products/berkeleydb/overview/index-093405.html"> <!ENTITY oracleBdbRepGuideUrl "http://oracle.com/cd/E17277_02/html/ReplicationGuide/"> <!ENTITY oracleBdbJavaDocUrl "http://docs.oracle.com/cd/E17277_02/html/java/"> -<!ENTITY oracleBdbProductVersion "5.0.97"> +<!ENTITY oracleBdbProductVersion "5.0.104"> <!ENTITY oracleJmxTutorial "http://docs.oracle.com/javase/tutorial/jmx/"> Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jan 27 15:00:13 2015 @@ -9,3 +9,4 @@ /qpid/branches/java-network-refactor/qpid/java:805429-821809 /qpid/branches/qpid-2935/qpid/java:1061302-1072333 /qpid/trunk/qpid:796646-796653 +/qpid/trunk/qpid/java:1643238-1655056 Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Tue Jan 27 15:00:13 2015 @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; + import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; @@ -32,19 +33,20 @@ import javax.jms.InvalidDestinationExcep import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.MessageListener; + import org.apache.qpid.amqp_1_0.client.AcknowledgeMode; import org.apache.qpid.amqp_1_0.client.ConnectionErrorException; import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.client.Receiver; import org.apache.qpid.amqp_1_0.client.Transaction; import org.apache.qpid.amqp_1_0.jms.MessageConsumer; +import org.apache.qpid.amqp_1_0.jms.MessageConsumerException; import org.apache.qpid.amqp_1_0.jms.Queue; import org.apache.qpid.amqp_1_0.jms.QueueReceiver; import org.apache.qpid.amqp_1_0.jms.Session; import org.apache.qpid.amqp_1_0.jms.TemporaryDestination; import org.apache.qpid.amqp_1_0.jms.Topic; import org.apache.qpid.amqp_1_0.jms.TopicSubscriber; -import org.apache.qpid.amqp_1_0.jms.MessageConsumerException; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; @@ -333,10 +335,23 @@ public class MessageConsumerImpl impleme message.setFromTopic(_isTopicSubscriber); if(redelivery) { + UnsignedInteger failures = message.getDeliveryFailures(); + if(!message.getJMSRedelivered()) { message.setJMSRedelivered(true); } + + if(failures == null) + { + message.setDeliveryFailures(UnsignedInteger.ONE); + } + else + { + message.setDeliveryFailures(failures.add(UnsignedInteger.ONE)); + } + + } return message; Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1655057&r1=1655056&r2=1655057&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Tue Jan 27 15:00:13 2015 @@ -530,6 +530,7 @@ public class Receiver implements Deliver { release(msg); } + _session.removeReceiver(this); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
