Author: kpvdr
Date: Wed Jan 15 22:07:30 2014
New Revision: 1558589

URL: http://svn.apache.org/r1558589
Log:
QPID-5483: [linearstore] Recovery of journal with partly written record fails 
with "JERR_JREC_BADRECTAIL: Invalid data record tail" error message

Modified:
    qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Wed Jan 15 22:07:30 2014
@@ -17,52 +17,70 @@
 # under the License.
 #
 
-LinearStore issues:
+Linear Store issues:
 
-Store:
-------
-
-1. (FIXED) Overwrite identity: When recovering a previously used file, if the 
write boundary coincides with old record
-   start, no way of discriminating old from new at boundary (used to use OWI).
-
-2. (FIXED) QPID-5357: Recycling files while in use not working, however, files 
are recovered to EFP during recovery. Must solve
-   #1 first.
-
-3. (FIXED) QPID-5358: Checksum not implemented in record tail, not checked 
during read.
-
-4. QPID-5359: Rework qpid management parameters and controls (QMF).
-
-5. QPID-5360: Consistent logging: rework logging to provide uniform and 
consistent logging from store (both logging
-   level and places where logging occurs).
-
-6. QPID-5361: No tests
-   * No existing tests for linearstore:
-   ** Basic broker-level tests for txn and non-txn recovery
-   ** Store-level tests which check write boundary conditions
-   ** Unit tests
-   ** Basic performance tests
-
-7: QPID-5362: No tools
-   * Store analysis and status
-   * Recovery/reading of message content
-
-8. One journal file lost when queue deleted. All files except for one are 
recycled back to the EFP.
-
-9. Complete exceptions - several exceptions thrown using jexception have no 
exception numbers
-
-Current bugs and performance issues:
-------------------------------------
-1. BZ 1035843 - Slow performance for producers
-2. (FIXED) QPID-5387 (BZ 1036071) - Crash when deleting queue
-3. (FIXED) QPID-5388 (BZ 1035802) - Segmentation fault when recovering empty 
queue
-4. (UNABLE TO REPRODUCE) BZ 1036026 - Unable to create durable queue - framing 
error - possibly caused by running both stores at the same time
-5. (UNABLE TO REPRODUCE) BZ 1038599 - Abort when deleting used queue after 
restart - may be dup of QPID-5387 (BZ 1036071)
-6. BZ 1039522 - Crash during recovery - JournalFile::getFqFileName() 
-JERR_JREC_BADRECTAIL
-7. BZ 1039525 - Crash during recovery - journal::jexception - 
JERR_JREC_BADRECTAIL
-8. (FIXED) QPID-5442 (BZ 1039949) - DTX test failure - missing XIDs
-9. (FIXED) QPID-5460 (BZ 1051097) - Transactional messages lost during recovery
-10. QPID-5464 - Incompletely created journal files accumulate in EFP
-11. QPID-5473 (BZ 1051924) - Recovery where last record in file is truncated 
(ie spans files), but following file is uninitialized causes crash
+Current/pending:
+================
+ Q-JIRA RHBZ     Description / Comments
+ ------ -------  ----------------------
+   5359 -        Linearstore: Implement new management schema and wire into 
store
+   5360 -        Linearstore: Evaluate and rework logging to produce a 
consistent log outputConsistent logging
+   5361 -        Linearstore: No tests for linearstore functionality currently 
exist
+                   * No existing tests for linearstore:
+                   ** Basic broker-level tests for txn and non-txn recovery
+                   ** Store-level tests which check write boundary conditions
+                   ** EFP tests, including file recovery, error management
+                   ** Unit tests
+                   ** Basic performance tests
+   5362 -        Linearstore: No store tools exist for examining the journals
+                   svn r.1558888 2014-01-09: WIP checkin for linearstore 
version of qpid_qls_analyze. Needs testing and tidy-up.
+                   * Store analysis and status
+                   * Recovery/reading of message content
+                   * Empty file pool status and management
+   5464 -        [linearstore] Incompletely created journal files accumulate 
in EFP
+   5479 1053701  [linearstore] Using recovered store results in 
"JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. 
(JournalFile::submittedDblkCount)" error message
+   5480 1053749  [linearstore] Recovery of store failure with 
"JERR_MAP_NOTFOUND: Key not found in map." error message
+   -    1035843  Slow performance for producers
+   -    1036026  [LinearStore] Qpid linear store unable to create durable 
queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+                   UNABLE TO REPRODUCE - but Frantizek has additional info
+   -    1039522  Qpid crashes while recovering from linear store around 
apid::linearstore::journal::JournalFile::getFqFileName() including 
enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
+   -    1039525  Qpid crashes while recovering from linear store around 
apid::linearstore::journal::jexception::format including enq_rec::decode() 
threw JERR_JREC_BAD_REC_TAIL
+                   * Possible dup of 1039522
+
+Fixed/closed:
+=============
+ Q-JIRA RHBZ     Description / Comments
+ ------ -------  ----------------------
+   5357 1052518  Linearstore: Empty file recycling not functional
+                   svn r.1545563 2013-11-26: Propsed fix
+   5358 1052727  Linearstore: Checksums not implemented in record tail
+                   svn r.1547601 2013-12-03: Propsed fix
+   5387 1036071  Linearstore: Segmentation fault when deleting queue
+                   svn r.1547641 2013-12-03: Propsed fix
+   5388 1035802  Linearstore: Segmentation fault when recovering empty queue
+                   svn r.1547921 2013-12-04: Propsed fix
+NO-JIRA -        Added missing Apache copyright/license text
+                   svn r.1551304 2013-12-16: Propsed fix
+   5425 1052445  Linearstore: Transaction Prepared List (TPL) fails with 
jexception 0x0402 AtomicCounter::addLimit() threw JERR_JNLF_FILEOFFSOVFL
+                   svn r.1551361 2013-12-16: Proposed fix
+   5442 1039949  Linearstore: Dtx recover test fails
+                   svn r.1552772 2013-12-20: Proposed fix
+   5444 1052775  Linearstore: Recovering from qpid-txtest fails with 
"Inconsistent TPL 2PC count" error message
+                   svn r.1553148 2013-12-23: Proposed fix
+   -    1038599  [LinearStore] Abort when deleting used queue after restart
+                   CLOSED-NOTABUG 2014-01-06
+   5460 1051097  [linearstore] Recovery of store which contains prepared but 
incomplete transactions results in message loss
+                   svn r.1556892 2014-01-09: Proposed fix
+   5473 1051924  [linearstore] Recovery of journal in which last logical file 
contains truncated record causes crash
+                   svn r.1557620 2014-01-12: Proposed fix
+
+Future:
+=======
+* One journal file lost when queue deleted. All files except for one are 
recycled back to the EFP.
+* Complete exceptions - several exceptions thrown using jexception have no 
exception numbers
+* Investigate ability of store to detect missing journal files, especially 
from logical end of a journal
+* Investigate ability of store to handle file muddle-ups (ie journal files 
from EFP which are not zeroed or other journals)
+* Look at improving the efficiency of recovery - right now the entire store is 
read once, and then each recovered record xid and data is read again
 
 Code tidy-up
 ------------

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/Checksum.cpp Wed Jan 15 
22:07:30 2014
@@ -30,9 +30,11 @@ Checksum::Checksum() : a(1UL), b(0UL), M
 Checksum::~Checksum() {}
 
 void Checksum::addData(const unsigned char* data, const std::size_t len) {
-    for (uint32_t i = 0; i < len; i++) {
-        a = (a + data[i]) % MOD_ADLER;
-        b = (a + b) % MOD_ADLER;
+    if (data) {
+        for (uint32_t i = 0; i < len; i++) {
+            a = (a + data[i]) % MOD_ADLER;
+            b = (a + b) % MOD_ADLER;
+        }
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp 
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Wed 
Jan 15 22:07:30 2014
@@ -221,28 +221,34 @@ bool RecoveryManager::readNextRemainingR
 
     // Check enqueue record checksum
     Checksum checksum;
-    checksum.addData((unsigned char*)&enqueueHeader, sizeof(::enq_hdr_t));
+    checksum.addData((const unsigned char*)&enqueueHeader, 
sizeof(::enq_hdr_t));
     if (xidSize > 0) {
-        checksum.addData((unsigned char*)*xidPtrPtr, xidSize);
+        checksum.addData((const unsigned char*)*xidPtrPtr, xidSize);
     }
     if (dataSize > 0) {
-        checksum.addData((unsigned char*)*dataPtrPtr, dataSize);
+        checksum.addData((const unsigned char*)*dataPtrPtr, dataSize);
     }
     ::rec_tail_t enqueueTail;
     inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
     uint32_t cs = checksum.getChecksum();
 //std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " 
rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; 
// DEBUG
-    int res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
+    uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
     if (res != 0) {
         std::stringstream oss;
-        switch (res) {
-          case 1: oss << std::hex << "Magic: expected 0x" << 
~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic; break;
-          case 2: oss << std::hex << "Serial: expected 0x" << 
enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial; break;
-          case 3: oss << std::hex << "Record Id: expected 0x" << 
enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid; break;
-          case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 
0x" << enqueueTail._checksum; break;
-          default: oss << "Unknown error " << res;
+        oss << "Bad record tail:" << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << 
~enqueueHeader._rhdr._magic << "; found 0x" << enqueueTail._xmagic;
         }
-        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", 
"decode"); // TODO: Don't throw exception, log info
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << 
enqueueHeader._rhdr._serial << "; found 0x" << enqueueTail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << 
enqueueHeader._rhdr._rid << "; found 0x" << enqueueTail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 
0x" << enqueueTail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), 
"RecoveryManager", "readNextRemainingRecord"); // TODO: Don't throw exception, 
log info
     }
 
     // Set data token
@@ -472,7 +478,13 @@ bool RecoveryManager::decodeRecord(jrec&
             done = record.decode(headerRecord, &inFileStream_, 
cumulativeSizeRead);
         }
         catch (const jexception& e) {
-            journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+            if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
+                std::ostringstream oss;
+                oss << jerrno::err_msg(e.err_code()) << e.additional_info();
+                journalLogRef_.log(JournalLog::LOG_INFO, queueName_, 
oss.str());
+            } else {
+                journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
+            }
             checkJournalAlignment(start_file_offs);
             return false;
         }
@@ -602,7 +614,6 @@ bool RecoveryManager::getNextRecordHeade
                             oss << std::hex << "_tmap.set_aio_compl: txn_enq 
xid=\"" << xid << "\" rid=0x" << h._rid;
                             throw jexception(jerrno::JERR_MAP_NOTFOUND, 
oss.str(), "RecoveryManager", "getNextRecordHeader");
                         }
-                        std::free(xidp);
                     } else {
                         if (enqueueMapRef_.insert_pfid(h._rid, start_fid, 
file_pos) < enq_map::EMAP_OK) { // fail
                             // The only error code emap::insert_pfid() returns 
is enq_map::EMAP_DUP_RID.
@@ -641,7 +652,6 @@ bool RecoveryManager::getNextRecordHeade
                         oss << std::hex << "_tmap.set_aio_compl: txn_deq 
xid=\"" << xid << "\" rid=0x" << dr.rid();
                         throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), 
"RecoveryManager", "getNextRecordHeader");
                     }
-                    std::free(xidp);
                 } else {
                     uint64_t enq_fid;
                     if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, 
true) == enq_map::EMAP_OK) { // ignore not found error
@@ -675,7 +685,6 @@ bool RecoveryManager::getNextRecordHeade
                         enqueueMapRef_.unlock(itr->drid_); // ignore not found 
error
                     }
                 }
-                std::free(xidp);
             }
             break;
         case QLS_TXC_MAGIC:
@@ -711,7 +720,6 @@ bool RecoveryManager::getNextRecordHeade
                             fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
                     }
                 }
-                std::free(xidp);
             }
             break;
         case QLS_EMPTY_MAGIC:

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp Wed Jan 15 
22:07:30 2014
@@ -32,7 +32,7 @@ namespace journal {
 
 deq_rec::deq_rec():
         _xidp(0),
-        _buff(0)
+        _xid_buff(0)
 {
     ::deq_hdr_init(&_deq_hdr, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 0);
     ::rec_tail_copy(&_deq_tail, &_deq_hdr._rhdr, 0);
@@ -53,7 +53,7 @@ deq_rec::reset(const uint64_t serial, co
     _deq_hdr._deq_rid = drid;
     _deq_hdr._xidsize = xidlen;
     _xidp = xidp;
-    _buff = 0;
+    _xid_buff = 0;
     _deq_tail._serial = serial;
     _deq_tail._rid = rid;
     _deq_tail._checksum = 0UL;
@@ -192,15 +192,15 @@ deq_rec::decode(::rec_hdr_t& h, std::ifs
         // Read header, allocate (if req'd) for xid
         if (_deq_hdr._xidsize)
         {
-            _buff = std::malloc(_deq_hdr._xidsize);
-            MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+            _xid_buff = std::malloc(_deq_hdr._xidsize);
+            MALLOC_CHK(_xid_buff, "_buff", "enq_rec", "rcv_decode");
         }
     }
     if (rec_offs < sizeof(_deq_hdr) + _deq_hdr._xidsize)
     {
         // Read xid (or continue reading xid)
         std::size_t offs = rec_offs - sizeof(_deq_hdr);
-        ifsp->read((char*)_buff + offs, _deq_hdr._xidsize - offs);
+        ifsp->read((char*)_xid_buff + offs, _deq_hdr._xidsize - offs);
         std::size_t size_read = ifsp->gcount();
         rec_offs += size_read;
         if (size_read < _deq_hdr._xidsize - offs)
@@ -228,39 +228,22 @@ deq_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
+        check_rec_tail();
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
-    if (_deq_hdr._xidsize) {
-        Checksum checksum;
-        checksum.addData((unsigned char*)&_deq_hdr, sizeof(_deq_hdr));
-        checksum.addData((unsigned char*)_buff, _deq_hdr._xidsize);
-        uint32_t cs = checksum.getChecksum();
-        int res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
-        if (res != 0) {
-            std::stringstream oss;
-            switch (res) {
-              case 1: oss << std::hex << "Magic: expected 0x" << 
~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic; break;
-              case 2: oss << std::hex << "Serial: expected 0x" << 
_deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial; break;
-              case 3: oss << std::hex << "Record Id: expected 0x" << 
_deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid; break;
-              case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; 
found 0x" << _deq_tail._checksum; break;
-              default: oss << "Unknown error " << res;
-            }
-            throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), 
"deq_rec", "decode"); // TODO: Don't throw exception, log info
-        }
-    }
     return true;
 }
 
 std::size_t
 deq_rec::get_xid(void** const xidpp)
 {
-    if (!_buff)
+    if (!_xid_buff)
     {
         *xidpp = 0;
         return 0;
     }
-    *xidpp = _buff;
+    *xidpp = _xid_buff;
     return _deq_hdr._xidsize;
 }
 
@@ -291,9 +274,40 @@ deq_rec::rec_size() const
 }
 
 void
+deq_rec::check_rec_tail() const {
+    Checksum checksum;
+    checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t));
+    if (_deq_hdr._xidsize > 0) {
+        checksum.addData((const unsigned char*)_xid_buff, _deq_hdr._xidsize);
+    }
+    uint32_t cs = checksum.getChecksum();
+    uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
+    if (res != 0) {
+        std::stringstream oss;
+        oss << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << 
~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic;
+        }
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << 
_deq_hdr._rhdr._serial << "; found 0x" << _deq_tail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << 
_deq_hdr._rhdr._rid << "; found 0x" << _deq_tail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 
0x" << _deq_tail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "deq_rec", 
"check_rec_tail");
+    }
+}
+
+void
 deq_rec::clean()
 {
-    // clean up allocated memory here
+    if (_xid_buff) {
+        std::free(_xid_buff);
+        _xid_buff = 0;
+    }
 }
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h Wed Jan 15 
22:07:30 2014
@@ -39,7 +39,7 @@ class deq_rec : public jrec
 private:
     ::deq_hdr_t _deq_hdr;   ///< Local instance of dequeue header struct
     const void* _xidp;      ///< xid pointer for encoding (writing to disk)
-    void* _buff;            ///< Pointer to buffer to receive data read from 
disk
+    void* _xid_buff;        ///< Pointer to buffer to receive xid read from 
disk
     ::rec_tail_t _deq_tail; ///< Local instance of enqueue tail struct, only 
encoded if XID is present
 
 public:
@@ -59,6 +59,7 @@ public:
     inline std::size_t data_size() const { return 0; } // This record never 
carries data
     std::size_t xid_size() const;
     std::size_t rec_size() const;
+    void check_rec_tail() const;
 
 private:
     virtual void clean();

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp Wed Jan 15 
22:07:30 2014
@@ -34,7 +34,8 @@ enq_rec::enq_rec():
         jrec(), // superclass
         _xidp(0),
         _data(0),
-        _buff(0)
+        _xid_buff(0),
+        _data_buff(0)
 {
     ::enq_hdr_init(&_enq_hdr, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, 0, 0, 0, 0, 
false);
     ::rec_tail_copy(&_enq_tail, &_enq_hdr._rhdr, 0);
@@ -57,7 +58,6 @@ enq_rec::reset(const uint64_t serial, co
     _enq_hdr._dsize = dlen;
     _xidp = xidp;
     _data = dbuf;
-    _buff = 0;
     _enq_tail._serial = serial;
     _enq_tail._rid = rid;
 }
@@ -229,15 +229,20 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
         rec_offs = sizeof(::enq_hdr_t);
         if (_enq_hdr._xidsize > 0)
         {
-            _buff = std::malloc(_enq_hdr._xidsize);
-            MALLOC_CHK(_buff, "_buff", "enq_rec", "rcv_decode");
+            _xid_buff = std::malloc(_enq_hdr._xidsize);
+            MALLOC_CHK(_xid_buff, "_xid_buff", "enq_rec", "decode");
+        }
+        if (_enq_hdr._dsize > 0)
+        {
+            _data_buff = std::malloc(_enq_hdr._dsize);
+            MALLOC_CHK(_data_buff, "_data_buff", "enq_rec", "decode")
         }
     }
     if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize)
     {
         // Read xid (or continue reading xid)
         std::size_t offs = rec_offs - sizeof(_enq_hdr);
-        ifsp->read((char*)_buff + offs, _enq_hdr._xidsize - offs);
+        ifsp->read((char*)_xid_buff + offs, _enq_hdr._xidsize - offs);
         std::size_t size_read = ifsp->gcount();
         rec_offs += size_read;
         if (size_read < _enq_hdr._xidsize - offs)
@@ -253,9 +258,9 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
     {
         if (rec_offs < sizeof(_enq_hdr) + _enq_hdr._xidsize +  _enq_hdr._dsize)
         {
-            // Ignore data (or continue ignoring data)
+            // Read data (or continue reading data)
             std::size_t offs = rec_offs - sizeof(_enq_hdr) - _enq_hdr._xidsize;
-            ifsp->ignore(_enq_hdr._dsize - offs);
+            ifsp->read((char*)_data_buff + offs, _enq_hdr._dsize - offs);
             std::size_t size_read = ifsp->gcount();
             rec_offs += size_read;
             if (size_read < _enq_hdr._dsize - offs)
@@ -286,6 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
+        check_rec_tail();
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
@@ -295,27 +301,25 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
 std::size_t
 enq_rec::get_xid(void** const xidpp)
 {
-    if (!_buff || !_enq_hdr._xidsize)
-    {
+    if (!_xid_buff || !_enq_hdr._xidsize) {
         *xidpp = 0;
         return 0;
     }
-    *xidpp = _buff;
+    *xidpp = _xid_buff;
     return _enq_hdr._xidsize;
 }
 
 std::size_t
 enq_rec::get_data(void** const datapp)
 {
-    if (!_buff)
-    {
+    if (!_data_buff) {
         *datapp = 0;
         return 0;
     }
     if (::is_enq_external(&_enq_hdr))
         *datapp = 0;
     else
-        *datapp = (void*)((char*)_buff + _enq_hdr._xidsize);
+        *datapp = _data_buff;
     return _enq_hdr._dsize;
 }
 
@@ -348,9 +352,46 @@ enq_rec::rec_size(const std::size_t xids
 }
 
 void
-enq_rec::clean()
-{
-    // clean up allocated memory here
+enq_rec::check_rec_tail() const {
+    Checksum checksum;
+    checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
+    if (_enq_hdr._xidsize > 0) {
+        checksum.addData((const unsigned char*)_xid_buff, _enq_hdr._xidsize);
+    }
+    if (_enq_hdr._dsize > 0) {
+        checksum.addData((const unsigned char*)_data_buff, _enq_hdr._dsize);
+    }
+    uint32_t cs = checksum.getChecksum();
+    uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
+    if (res != 0) {
+        std::stringstream oss;
+        oss << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << 
~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
+        }
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << 
_enq_hdr._rhdr._serial << "; found 0x" << _enq_tail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << 
_enq_hdr._rhdr._rid << "; found 0x" << _enq_tail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 
0x" << _enq_tail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "enq_rec", 
"check_rec_tail");
+    }
+}
+
+void
+enq_rec::clean() {
+    if (_xid_buff) {
+        std::free(_xid_buff);
+        _xid_buff = 0;
+    }
+    if (_data_buff) {
+        std::free(_data_buff);
+        _data_buff = 0;
+    }
 }
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h Wed Jan 15 
22:07:30 2014
@@ -40,7 +40,8 @@ private:
     ::enq_hdr_t _enq_hdr;   ///< Local instance of enqueue header struct
     const void* _xidp;      ///< xid pointer for encoding (for writing to disk)
     const void* _data;      ///< Pointer to data to be written to disk
-    void* _buff;            ///< Pointer to buffer to receive data read from 
disk
+    void* _xid_buff;
+    void* _data_buff;
     ::rec_tail_t _enq_tail; ///< Local instance of enqueue tail struct
 
 public:
@@ -62,6 +63,7 @@ public:
     std::size_t rec_size() const;
     static std::size_t rec_size(const std::size_t xidsize, const std::size_t 
dsize, const bool external);
     inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
+    void check_rec_tail() const;
 
 private:
     virtual void clean();

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp Wed Jan 15 
22:07:30 2014
@@ -32,7 +32,7 @@ namespace journal {
 
 txn_rec::txn_rec():
         _xidp(0),
-        _buff(0)
+        _xid_buff(0)
 {
     ::txn_hdr_init(&_txn_hdr, 0, QLS_JRNL_VERSION, 0, 0, 0, 0);
     ::rec_tail_init(&_txn_tail, 0, 0, 0, 0);
@@ -52,7 +52,7 @@ txn_rec::reset(const bool commitFlag, co
     _txn_hdr._rhdr._rid = rid;
     _txn_hdr._xidsize = xidlen;
     _xidp = xidp;
-    _buff = 0;
+    _xid_buff = 0;
     _txn_tail._xmagic = ~_txn_hdr._rhdr._magic;
     _txn_tail._serial = serial;
     _txn_tail._rid = rid;
@@ -184,14 +184,14 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
         ::rec_hdr_copy(&_txn_hdr._rhdr, &h);
         ifsp->read((char*)&_txn_hdr._xidsize, sizeof(_txn_hdr._xidsize));
         rec_offs = sizeof(::txn_hdr_t);
-        _buff = std::malloc(_txn_hdr._xidsize);
-        MALLOC_CHK(_buff, "_buff", "txn_rec", "rcv_decode");
+        _xid_buff = std::malloc(_txn_hdr._xidsize);
+        MALLOC_CHK(_xid_buff, "_buff", "txn_rec", "rcv_decode");
     }
     if (rec_offs < sizeof(txn_hdr_t) + _txn_hdr._xidsize)
     {
         // Read xid (or continue reading xid)
         std::size_t offs = rec_offs - sizeof(txn_hdr_t);
-        ifsp->read((char*)_buff + offs, _txn_hdr._xidsize - offs);
+        ifsp->read((char*)_xid_buff + offs, _txn_hdr._xidsize - offs);
         std::size_t size_read = ifsp->gcount();
         rec_offs += size_read;
         if (size_read < _txn_hdr._xidsize - offs)
@@ -218,39 +218,23 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
+        check_rec_tail();
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
     assert(_txn_hdr._xidsize > 0);
-
-    Checksum checksum;
-    checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
-    checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);
-    uint32_t cs = checksum.getChecksum();
-    int res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
-    if (res != 0) {
-        std::stringstream oss;
-        switch (res) {
-          case 1: oss << std::hex << "Magic: expected 0x" << 
~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic; break;
-          case 2: oss << std::hex << "Serial: expected 0x" << 
_txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial; break;
-          case 3: oss << std::hex << "Record Id: expected 0x" << 
_txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid; break;
-          case 4: oss << std::hex << "Checksum: expected 0x" << cs << "; found 
0x" << _txn_tail._checksum; break;
-          default: oss << "Unknown error " << res;
-        }
-        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", 
"decode"); // TODO: Don't throw exception, log info
-    }
     return true;
 }
 
 std::size_t
 txn_rec::get_xid(void** const xidpp)
 {
-    if (!_buff)
+    if (!_xid_buff)
     {
         *xidpp = 0;
         return 0;
     }
-    *xidpp = _buff;
+    *xidpp = _xid_buff;
     return _txn_hdr._xidsize;
 }
 
@@ -282,9 +266,40 @@ txn_rec::rec_size() const
 }
 
 void
+txn_rec::check_rec_tail() const {
+    Checksum checksum;
+    checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t));
+    if (_txn_hdr._xidsize > 0) {
+        checksum.addData((const unsigned char*)_xid_buff, _txn_hdr._xidsize);
+    }
+    uint32_t cs = checksum.getChecksum();
+    uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
+    if (res != 0) {
+        std::stringstream oss;
+        oss << std::hex;
+        if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
+            oss << std::endl << "  Magic: expected 0x" << 
~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic;
+        }
+        if (res & ::REC_TAIL_SERIAL_ERR_MASK) {
+            oss << std::endl << "  Serial: expected 0x" << 
_txn_hdr._rhdr._serial << "; found 0x" << _txn_tail._serial;
+        }
+        if (res & ::REC_TAIL_RID_ERR_MASK) {
+            oss << std::endl << "  Record Id: expected 0x" << 
_txn_hdr._rhdr._rid << "; found 0x" << _txn_tail._rid;
+        }
+        if (res & ::REC_TAIL_CHECKSUM_ERR_MASK) {
+            oss << std::endl << "  Checksum: expected 0x" << cs << "; found 
0x" << _txn_tail._checksum;
+        }
+        throw jexception(jerrno::JERR_JREC_BADRECTAIL, oss.str(), "txn_rec", 
"check_rec_tail");
+    }
+}
+
+void
 txn_rec::clean()
 {
-    // clean up allocated memory here
+    if (_xid_buff) {
+        std::free(_xid_buff);
+        _xid_buff = 0;
+    }
 }
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h Wed Jan 15 
22:07:30 2014
@@ -39,7 +39,7 @@ class txn_rec : public jrec
 private:
     ::txn_hdr_t _txn_hdr;   ///< Local instance of transaction header struct
     const void* _xidp;      ///< xid pointer for encoding (writing to disk)
-    void* _buff;            ///< Pointer to buffer to receive data read from 
disk
+    void* _xid_buff;        ///< Pointer to buffer to receive xid read from 
disk
     ::rec_tail_t _txn_tail; ///< Local instance of enqueue tail struct
 
 public:
@@ -57,6 +57,7 @@ public:
     std::size_t xid_size() const;
     std::size_t rec_size() const;
     inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
+    void check_rec_tail() const;
 
 private:
     virtual void clean();

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.c Wed Jan 
15 22:07:30 2014
@@ -36,10 +36,11 @@ void rec_tail_copy(rec_tail_t* dest, con
     dest->_rid = src->_rid;
 }
 
-int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const 
uint32_t checksum) {
-    if (tail->_xmagic != ~header->_magic) return 1;
-    if (tail->_serial != header->_serial) return 2;
-    if (tail->_rid != header->_rid) return 3;
-    if (tail->_checksum != checksum) return 4;
-    return 0;
+uint16_t rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const 
uint32_t checksum) {
+    uint16_t err = 0;
+    if (tail->_xmagic != ~header->_magic) err |= REC_TAIL_MAGIC_ERR_MASK;
+    if (tail->_serial != header->_serial) err |= REC_TAIL_SERIAL_ERR_MASK;
+    if (tail->_rid != header->_rid) err |= REC_TAIL_RID_ERR_MASK;
+    if (tail->_checksum != checksum) err |= REC_TAIL_CHECKSUM_ERR_MASK;
+    return err;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h?rev=1558589&r1=1558588&r2=1558589&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/utils/rec_tail.h Wed Jan 
15 22:07:30 2014
@@ -63,10 +63,15 @@ typedef struct rec_tail_t {
     uint64_t _rid;                     /**< Record ID (rotating 64-bit 
counter) */
 } rec_tail_t;
 
+static const uint16_t REC_TAIL_MAGIC_ERR_MASK = 0x01;
+static const uint16_t REC_TAIL_SERIAL_ERR_MASK = 0x02;
+static const uint16_t REC_TAIL_RID_ERR_MASK = 0x04;
+static const uint16_t REC_TAIL_CHECKSUM_ERR_MASK = 0x08;
+
 void rec_tail_init(rec_tail_t* dest, const uint32_t xmagic, const uint32_t 
checksum, const uint64_t serial,
                    const uint64_t rid);
 void rec_tail_copy(rec_tail_t* dest, const rec_hdr_t* src, const uint32_t 
checksum);
-int rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const 
uint32_t checksum);
+uint16_t rec_tail_check(const rec_tail_t* tail, const rec_hdr_t* header, const 
uint32_t checksum);
 
 #pragma pack()
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to