Author: kpvdr
Date: Wed May 21 17:33:51 2014
New Revision: 1596633
URL: http://svn.apache.org/r1596633
Log:
NO_JIRA: [linearstore] Update to ISSUES; whitespace fix from last checkin
Added:
qpid/trunk/qpid/tools/src/py/qlslibs/
- copied from r1592994, qpid/trunk/qpid/tools/src/py/qls/
qpid/trunk/qpid/tools/src/py/qpid-qls-analyze
- copied, changed from r1592994,
qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py
Removed:
qpid/trunk/qpid/tools/src/py/qls/
qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py
Modified:
qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
qpid/trunk/qpid/tools/setup.py
qpid/trunk/qpid/tools/src/py/qlslibs/anal.py
qpid/trunk/qpid/tools/src/py/qlslibs/efp.py
qpid/trunk/qpid/tools/src/py/qlslibs/err.py
qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py
qpid/trunk/qpid/tools/src/py/qlslibs/utils.py
Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1596633&r1=1596632&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Wed May 21 17:33:51 2014
@@ -45,8 +45,6 @@ Current/pending:
* Recovery/reading of message content
* Empty file pool status and management
5464 - [linearstore] Incompletely created journal files accumulate
in EFP
-# 5750 1078142 [linearstore] qpidd closes connection with (distributed)
transactional client while checking previous transaction, broker signals error
(closed by error: Queue Ve0-2: async_dequeue() failed: exception 0x0103
wmgr::get_events() threw JERR__AIO: AIO error)
- * jexception 0x0103 wmgr::get_events() threw JERR__AIO: AIO
error. (AIO write operation failed: Invalid argument (-22) [pg=0 size=8192
offset=4096 fh=22])
- 1088944 [Linearstore] store does not return all files to EFP after
purging big queue
- 1078937 [linearstore] Installation and tests for new store analysis
tool qpid-qls-analyze
@@ -119,18 +117,23 @@ NO-JIRA - Added missing Apache co
5661 - [linearstore] Set default cmake build to exclude linearstore
svn r.1584379 2014-04-03 Proposed solution.
* Run ccmake, select BUILD_LINEARSTORE to change its value
to ON to build.
+ 5750 1078142 [linearstore] qpidd closes connection with (distributed)
transactional client while checking previous transaction, broker signals error
(closed by error: Queue Ve0-2: async_dequeue() failed: exception 0x0103
wmgr::get_events() threw JERR__AIO: AIO error)
+ svn r.1594215 2014-05-13 Proposed solution.
+ * jexception 0x0103 wmgr::get_events() threw JERR__AIO: AIO
error. (AIO write operation failed: Invalid argument (-22) [pg=0 size=8192
offset=4096 fh=22])
+ 5767 1098118 [linearstore] broker segfaults when recovering journal file
with damaged header
+ svn r.1596509 2014-05-21 Proposed solution (committed by
pmoravec)
Ordered checkin list:
=====================
In order to port the linearstore changes from trunk to a branch, the following
svn checkins need to be ported in order:
-no. svn r Q-JIRA RHBZ Date
---- ------- ------- -------- ----------
+no. svn r Q-JIRA RHBZ Date Alt Committer
+--- ------- ------- -------- ---------- -------------
1. 1545563 5357 1052518 2013-11-26
2. 1547601 5358 1052727 2013-12-03
3. 1547641 5387 1036071 2013-12-03
4. 1547921 5388 1035802 2013-12-04
- 5. 1551304 NO-JIRA - 2013-12-16
+ 5. 1551304 NO-JIRA - 2013-12-16 (aconway)
6. 1551361 5425 1052445 2013-12-16
7. 1552772 5442 1039949 2013-12-20
8. 1553148 5444 1052775 2013-12-23
@@ -149,9 +152,11 @@ no. svn r Q-JIRA RHBZ Date
21. 1574513 5603 1063700 2014-03-05
22. 1575009 5607 1064181 2014-03-06
23. 1578899 5362 - 2014-03-18
-24. 1582730 5651 - 2014-03-28
+24. 1582730 5651 - 2014-03-28 (pmoravec)
25. 1583778 5362 - 2014-04-01
26. 1584379 5661 - 2014-04-03
+27. 1594215 5750 1078142 2014-05-13
+28. 1596509 5767 1098118 2014-05-21 (pmoravec)
See above sections for details on these checkins.
Modified:
qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp?rev=1596633&r1=1596632&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
(original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
Wed May 21 17:33:51 2014
@@ -186,7 +186,7 @@ EmptyFilePool* EmptyFilePoolManager::get
const efpDataSize_kib_t
efpDataSize_kib) {
EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ?
partitionNumber : defaultPartitionNumber_);
if (efppp != 0)
- return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib :
defaultEfpDataSize_kib_);
+ return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib :
defaultEfpDataSize_kib_);
return 0;
}
Modified: qpid/trunk/qpid/tools/setup.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/setup.py?rev=1596633&r1=1596632&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/setup.py (original)
+++ qpid/trunk/qpid/tools/setup.py Wed May 21 17:33:51 2014
@@ -47,6 +47,13 @@ setup(name="qpid-tools",
"src/py/qpid-stat",
"src/py/qpid-tool",
"src/py/qmf-tool"],
+ data_files=[("/usr/libexec", ["src/py/qpid-qls-analyze"]),
+ ("/usr/shared/qpid-tools/python/qlslibs",
+
["src/py/qlslibs/anal.py",
+ "src/py/qlslibs/efp.py",
+
"src/py/qlslibs/err.py",
+
"src/py/qlslibs/jrnl.py",
+ "src/py/qlslibs/utils.py"])],
url="http://qpid.apache.org/",
license="Apache Software License",
description="Diagnostic and management tools for Apache Qpid brokers.",
Modified: qpid/trunk/qpid/tools/src/py/qlslibs/anal.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/anal.py?rev=1596633&r1=1592994&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/anal.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/anal.py Wed May 21 17:33:51 2014
@@ -18,15 +18,15 @@
#
"""
-Module: qls.anal
+Module: qlslibs.anal
Classes for recovery and analysis of a Qpid Linear Store (QLS).
"""
import os.path
-import qls.err
-import qls.jrnl
-import qls.utils
+import qlslibs.err
+import qlslibs.jrnl
+import qlslibs.utils
class HighCounter(object):
def __init__(self):
@@ -45,7 +45,7 @@ class JournalRecoveryManager(object):
JRNL_DIR_NAME = 'jrnl'
def __init__(self, directory, args):
if not os.path.exists(directory):
- raise qls.err.InvalidQlsDirectoryNameError(directory)
+ raise qlslibs.err.InvalidQlsDirectoryNameError(directory)
self.directory = directory
self.args = args
self.tpl = None
@@ -86,14 +86,14 @@ class JournalRecoveryManager(object):
status = '[Prepared, but interrupted during commit phase]'
else:
status = '[Prepared, but interrupted during abort phase]'
- print ' ', qls.utils.format_xid(xid), status
+ print ' ', qlslibs.utils.format_xid(xid), status
if prepared_list[xid] is None: # Prepared, but not committed or
aborted
enqueue_record = self.tpl.get_txn_map_record(xid)[0][1]
- dequeue_record =
qls.utils.create_record(qls.jrnl.DequeueRecord.MAGIC, \
-
qls.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
-
self.tpl.current_journal_file, \
-
self.high_rid_counter.get_next(), \
-
enqueue_record.record_id, xid, None)
+ dequeue_record =
qlslibs.utils.create_record(qlslibs.jrnl.DequeueRecord.MAGIC, \
+
qlslibs.jrnl.DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
+
self.tpl.current_journal_file, \
+
self.high_rid_counter.get_next(), \
+
enqueue_record.record_id, xid, None)
if txn_flag:
self.tpl.add_record(dequeue_record)
for queue_name in sorted(self.journals.keys()):
@@ -101,10 +101,10 @@ class JournalRecoveryManager(object):
if len(prepared_list) > 0:
print 'Completing prepared transactions in prepared transaction
list:'
for xid in prepared_list.keys():
- print ' ', qls.utils.format_xid(xid)
- transaction_record =
qls.utils.create_record(qls.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \
-
self.tpl.current_journal_file, \
-
self.high_rid_counter.get_next(), None, xid, None)
+ print ' ', qlslibs.utils.format_xid(xid)
+ transaction_record =
qlslibs.utils.create_record(qlslibs.jrnl.TransactionRecord.MAGIC_COMMIT, 0, \
+
self.tpl.current_journal_file, \
+
self.high_rid_counter.get_next(), None, xid, None)
if txn_flag:
self.tpl.add_record(transaction_record)
print
@@ -118,7 +118,7 @@ class EnqueueMap(object):
self.enq_map = {}
def add(self, journal_file, enq_record, locked_flag):
if enq_record.record_id in self.enq_map:
- raise
qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
+ raise
qlslibs.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
self.enq_map[enq_record.record_id] = [journal_file, enq_record,
locked_flag]
def contains(self, rid):
"""Return True if the map contains the given rid"""
@@ -129,14 +129,14 @@ class EnqueueMap(object):
del self.enq_map[deq_record.dequeue_record_id]
return enq_list
else:
- raise qls.err.RecordIdNotFoundError(journal_file.file_header,
deq_record)
+ raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header,
deq_record)
def get(self, record_id):
if record_id in self.enq_map:
return self.enq_map[record_id]
return None
def lock(self, journal_file, dequeue_record):
if dequeue_record.dequeue_record_id not in self.enq_map:
- raise qls.err.RecordIdNotFoundError(journal_file.file_header,
dequeue_record)
+ raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header,
dequeue_record)
self.enq_map[dequeue_record.dequeue_record_id][2] = True
def report_str(self, _, show_records):
"""Return a string containing a text report for all records in the
map"""
@@ -163,9 +163,9 @@ class EnqueueMap(object):
if self.enq_map[dequeue_record.dequeue_record_id][2]:
self.enq_map[dequeue_record.dequeue_record_id][2] = False
else:
- raise qls.err.RecordNotLockedError(journal_file.file_header,
dequeue_record)
+ raise
qlslibs.err.RecordNotLockedError(journal_file.file_header, dequeue_record)
else:
- raise qls.err.RecordIdNotFoundError(journal_file.file_header,
dequeue_record)
+ raise qlslibs.err.RecordIdNotFoundError(journal_file.file_header,
dequeue_record)
class TransactionMap(object):
"""
@@ -177,7 +177,7 @@ class TransactionMap(object):
def abort(self, xid):
"""Perform an abort operation for the given xid record"""
for journal_file, record, _ in self.txn_map[xid]:
- if isinstance(record, qls.jrnl.DequeueRecord):
+ if isinstance(record, qlslibs.jrnl.DequeueRecord):
if self.enq_map.contains(record.dequeue_record_id):
self.enq_map.unlock(journal_file, record)
else:
@@ -185,16 +185,16 @@ class TransactionMap(object):
del self.txn_map[xid]
def add(self, journal_file, record):
if record.xid is None:
- raise
qls.err.NonTransactionalRecordError(journal_file.file_header, record,
'TransactionMap.add()')
- if isinstance(record, qls.jrnl.DequeueRecord):
+ raise
qlslibs.err.NonTransactionalRecordError(journal_file.file_header, record,
'TransactionMap.add()')
+ if isinstance(record, qlslibs.jrnl.DequeueRecord):
try:
self.enq_map.lock(journal_file, record)
- except qls.err.RecordIdNotFoundError:
+ except qlslibs.err.RecordIdNotFoundError:
# Not in emap, look for rid in tmap - should not happen in
practice
txn_op = self._find_record_id(record.xid,
record.dequeue_record_id)
if txn_op != None:
if txn_op[2]:
- raise
qls.err.AlreadyLockedError(journal_file.file_header, record)
+ raise
qlslibs.err.AlreadyLockedError(journal_file.file_header, record)
txn_op[2] = True
if record.xid in self.txn_map:
self.txn_map[record.xid].append([journal_file, record, False]) #
append to existing list
@@ -204,7 +204,7 @@ class TransactionMap(object):
"""Perform a commit operation for the given xid record"""
mismatch_list = []
for journal_file, record, lock in self.txn_map[xid]:
- if isinstance(record, qls.jrnl.EnqueueRecord):
+ if isinstance(record, qlslibs.jrnl.EnqueueRecord):
self.enq_map.add(journal_file, record, lock) # Transfer enq to
emap
else:
if self.enq_map.contains(record.dequeue_record_id):
@@ -224,8 +224,8 @@ class TransactionMap(object):
if transaction_record.magic[-1] == 'a':
self.abort(transaction_record.xid)
else:
- raise qls.err.InvalidRecordTypeError(journal_file.file_header,
transaction_record,
- 'delete from Transaction Map')
+ raise qlslibs.err.InvalidRecordTypeError(journal_file.file_header,
transaction_record,
+ 'delete from Transaction
Map')
def get(self, xid):
if xid in self.txn_map:
return self.txn_map[xid]
@@ -240,7 +240,7 @@ class TransactionMap(object):
prepared_list = {}
for xid in self.get_xid_list():
for _, record, _ in self.txn_map[xid]:
- if isinstance(record, qls.jrnl.EnqueueRecord):
+ if isinstance(record, qlslibs.jrnl.EnqueueRecord):
prepared_list[xid] = None
else:
prepared_list[xid] =
record.is_transaction_complete_commit()
@@ -255,7 +255,7 @@ class TransactionMap(object):
if show_records:
rstr += ':'
for xid, op_list in self.txn_map.iteritems():
- rstr += '\n %s containing %d operations:' %
(qls.utils.format_xid(xid), len(op_list))
+ rstr += '\n %s containing %d operations:' %
(qlslibs.utils.format_xid(xid), len(op_list))
for journal_file, record, _ in op_list:
rstr += '\n 0x%x:%s' %
(journal_file.file_header.file_num, record)
else:
@@ -332,15 +332,15 @@ class Journal(object):
self.num_filler_records_required = None # TODO: Move into JournalFile
self.fill_to_offset = None
def add_record(self, record):
- if isinstance(record, qls.jrnl.EnqueueRecord) or isinstance(record,
qls.jrnl.DequeueRecord):
+ if isinstance(record, qlslibs.jrnl.EnqueueRecord) or
isinstance(record, qlslibs.jrnl.DequeueRecord):
if record.xid_size > 0:
self.txn_map.add(self.current_journal_file, record)
else:
self.enq_map.add(self.current_journal_file, record, False)
- elif isinstance(record, qls.jrnl.TransactionRecord):
+ elif isinstance(record, qlslibs.jrnl.TransactionRecord):
self.txn_map.delete(self.current_journal_file, record)
else:
- raise qls.err.InvalidRecordTypeError(self.current_journal_file,
record, 'add to Journal')
+ raise
qlslibs.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to
Journal')
def get_enq_map_record(self, rid):
return self.enq_map.get(rid)
def get_txn_map_record(self, xid):
@@ -356,9 +356,9 @@ class Journal(object):
while self._get_next_record(high_rid_counter):
pass
self._check_alignment()
- except qls.err.NoMoreFilesInJournalError:
+ except qlslibs.err.NoMoreFilesInJournalError:
print 'No more files in journal'
- except qls.err.FirstRecordOffsetMismatchError as err:
+ except qlslibs.err.FirstRecordOffsetMismatchError as err:
print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro
actual=0x%08x expected=0x%08x' % \
(err.get_expected_fro(), err.get_queue_name(),
err.get_file_number(), err.get_record_offset(),
err.get_expected_fro())
@@ -370,19 +370,19 @@ class Journal(object):
if xid in prepared_list.keys():
commit_flag = prepared_list[xid]
if commit_flag is None:
- print ' ', qls.utils.format_xid(xid), '- Assuming commit
after prepare'
+ print ' ', qlslibs.utils.format_xid(xid), '- Assuming
commit after prepare'
if txn_flag:
self.txn_map.commit(xid)
elif commit_flag:
- print ' ', qls.utils.format_xid(xid), '- Completing
interrupted commit operation'
+ print ' ', qlslibs.utils.format_xid(xid), '- Completing
interrupted commit operation'
if txn_flag:
self.txn_map.commit(xid)
else:
- print ' ', qls.utils.format_xid(xid), '- Completing
interrupted abort operation'
+ print ' ', qlslibs.utils.format_xid(xid), '- Completing
interrupted abort operation'
if txn_flag:
self.txn_map.abort(xid)
else:
- print ' ', qls.utils.format_xid(xid), '- Ignoring, not in
prepared transaction list'
+ print ' ', qlslibs.utils.format_xid(xid), '- Ignoring, not in
prepared transaction list'
if txn_flag:
self.txn_map.abort(xid)
def report(self, print_stats_flag):
@@ -408,25 +408,26 @@ class Journal(object):
if len(dir_entry_bits) == 2 and dir_entry_bits[1] ==
JournalRecoveryManager.JRNL_DIR_NAME:
fq_file_name = os.path.join(self.directory, dir_entry)
file_handle = open(fq_file_name)
- args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader)
- file_hdr = qls.jrnl.FileHeader(*args)
- file_hdr.init(file_handle, *qls.utils.load_args(file_handle,
qls.jrnl.FileHeader))
+ args = qlslibs.utils.load_args(file_handle,
qlslibs.jrnl.RecordHeader)
+ file_hdr = qlslibs.jrnl.FileHeader(*args)
+ file_hdr.init(file_handle,
*qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader))
if file_hdr.is_header_valid(file_hdr):
file_hdr.load(file_handle)
if file_hdr.is_valid():
- qls.utils.skip(file_handle,
file_hdr.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE)
+ qlslibs.utils.skip(file_handle,
+ file_hdr.file_header_size_sblks *
qlslibs.utils.DEFAULT_SBLK_SIZE)
self.files[file_hdr.file_num] = JournalFile(file_hdr)
self.file_num_list = sorted(self.files.keys())
self.file_num_itr = iter(self.file_num_list)
def _check_alignment(self): # TODO: Move into JournalFile
- remaining_sblks = self.last_record_offset % qls.utils.DEFAULT_SBLK_SIZE
+ remaining_sblks = self.last_record_offset %
qlslibs.utils.DEFAULT_SBLK_SIZE
if remaining_sblks == 0:
self.num_filler_records_required = 0
else:
- self.num_filler_records_required = (qls.utils.DEFAULT_SBLK_SIZE -
remaining_sblks) / \
- qls.utils.DEFAULT_DBLK_SIZE
+ self.num_filler_records_required =
(qlslibs.utils.DEFAULT_SBLK_SIZE - remaining_sblks) / \
+ qlslibs.utils.DEFAULT_DBLK_SIZE
self.fill_to_offset = self.last_record_offset + \
- (self.num_filler_records_required *
qls.utils.DEFAULT_DBLK_SIZE)
+ (self.num_filler_records_required *
qlslibs.utils.DEFAULT_DBLK_SIZE)
if self.args.show_recs or self.args.show_all_recs:
print '0x%x:0x%08x: %d filler records required for DBLK
alignment to 0x%08x' % \
(self.current_journal_file.file_header.file_num,
self.last_record_offset,
@@ -465,27 +466,27 @@ class Journal(object):
if not self._check_file():
return False
self.last_record_offset =
self.current_journal_file.file_header.file_handle.tell()
- this_record =
qls.utils.load(self.current_journal_file.file_header.file_handle,
qls.jrnl.RecordHeader)
+ this_record =
qlslibs.utils.load(self.current_journal_file.file_header.file_handle,
qlslibs.jrnl.RecordHeader)
if not
this_record.is_header_valid(self.current_journal_file.file_header):
return False
if self.first_rec_flag:
if this_record.file_offset !=
self.current_journal_file.file_header.first_record_offset:
- raise
qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header,
this_record)
+ raise
qlslibs.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header,
this_record)
self.first_rec_flag = False
self.statistics.total_record_count += 1
start_journal_file = self.current_journal_file
- if isinstance(this_record, qls.jrnl.EnqueueRecord):
+ if isinstance(this_record, qlslibs.jrnl.EnqueueRecord):
ok_flag = self._handle_enqueue_record(this_record,
start_journal_file)
high_rid_counter.check(this_record.record_id)
if self.args.show_recs or self.args.show_all_recs:
print '0x%x:%s' % (start_journal_file.file_header.file_num, \
this_record.to_string(self.args.show_xids,
self.args.show_data))
- elif isinstance(this_record, qls.jrnl.DequeueRecord):
+ elif isinstance(this_record, qlslibs.jrnl.DequeueRecord):
ok_flag = self._handle_dequeue_record(this_record,
start_journal_file)
high_rid_counter.check(this_record.record_id)
if self.args.show_recs or self.args.show_all_recs:
print '0x%x:%s' % (start_journal_file.file_header.file_num,
this_record.to_string(self.args.show_xids))
- elif isinstance(this_record, qls.jrnl.TransactionRecord):
+ elif isinstance(this_record, qlslibs.jrnl.TransactionRecord):
ok_flag = self._handle_transaction_record(this_record,
start_journal_file)
high_rid_counter.check(this_record.record_id)
if self.args.show_recs or self.args.show_all_recs:
@@ -495,7 +496,7 @@ class Journal(object):
ok_flag = True
if self.args.show_all_recs:
print '0x%x:%s' % (start_journal_file.file_header.file_num,
this_record)
- qls.utils.skip(self.current_journal_file.file_header.file_handle,
qls.utils.DEFAULT_DBLK_SIZE)
+ qlslibs.utils.skip(self.current_journal_file.file_header.file_handle,
qlslibs.utils.DEFAULT_DBLK_SIZE)
return ok_flag
def _handle_enqueue_record(self, enqueue_record, start_journal_file):
while
enqueue_record.load(self.current_journal_file.file_header.file_handle):
@@ -505,7 +506,7 @@ class Journal(object):
if not enqueue_record.is_valid(start_journal_file):
return False
if enqueue_record.is_external() and enqueue_record.data != None:
- raise
qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
+ raise
qlslibs.err.ExternalDataError(self.current_journal_file.file_header,
enqueue_record)
if enqueue_record.is_transient():
self.statistics.transient_record_count += 1
return True
@@ -538,7 +539,7 @@ class Journal(object):
else:
try:
self.enq_map.delete(start_journal_file,
dequeue_record)[0].decr_enq_cnt(dequeue_record)
- except qls.err.RecordIdNotFoundError:
+ except qlslibs.err.RecordIdNotFoundError:
dequeue_record.warnings.append('NOT IN EMAP')
self.statistics.dequeue_count += 1
return True
@@ -575,7 +576,7 @@ class JournalFile(object):
self.enq_cnt += 1
def decr_enq_cnt(self, record):
if self.enq_cnt <= self.deq_cnt:
- raise qls.err.EnqueueCountUnderflowError(self.file_header, record)
+ raise qlslibs.err.EnqueueCountUnderflowError(self.file_header,
record)
self.deq_cnt += 1
def get_enq_cnt(self):
return self.enq_cnt - self.deq_cnt
Modified: qpid/trunk/qpid/tools/src/py/qlslibs/efp.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/efp.py?rev=1596633&r1=1592994&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/efp.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/efp.py Wed May 21 17:33:51 2014
@@ -18,14 +18,14 @@
#
"""
-Module: qls.efp
+Module: qlslibs.efp
Contains empty file pool (EFP) classes.
"""
import os
import os.path
-import qls.err
+import qlslibs.err
import shutil
import uuid
@@ -36,7 +36,7 @@ class EfpManager(object):
"""
def __init__(self, directory, disk_space_required_kb):
if not os.path.exists(directory):
- raise qls.err.InvalidQlsDirectoryNameError(directory)
+ raise qlslibs.err.InvalidQlsDirectoryNameError(directory)
self.directory = directory
self.disk_space_required_kb = disk_space_required_kb
self.efp_partitions = []
@@ -72,7 +72,7 @@ class EfpManager(object):
efp = self.current_efp_partition.efp_pools[file_size]
num_files_needed = num_files - efp.get_tot_file_count()
if num_files_needed > 0:
-
self.current_efp_partition.create_new_efp_files(qls.utils.efp_directory_size(file_size),
+
self.current_efp_partition.create_new_efp_files(qlslibs.utils.efp_directory_size(file_size),
num_files_needed)
else:
print ' WARNING: Pool %s in partition %s already contains
%d files: no action taken' % \
@@ -120,7 +120,7 @@ class EfpManager(object):
self.efp_pools[efpl].append(efp_partition.efp_pools[efpl])
self.total_num_files += efp_partition.tot_file_count
self.total_cum_file_size_kb += efp_partition.tot_file_size_kb
- except qls.err.InvalidPartitionDirectoryNameError:
+ except qlslibs.err.InvalidPartitionDirectoryNameError:
pass
def _check_args(self, arg_tup):
""" Value check of args. The names of partitions and pools are
validated against the discovered instances """
@@ -138,16 +138,16 @@ class EfpManager(object):
found = True
break
if not found:
- raise qls.err.PartitionDoesNotExistError(arg_partition)
+ raise qlslibs.err.PartitionDoesNotExistError(arg_partition)
except ValueError:
- raise qls.err.InvalidPartitionDirectoryNameError(arg_partition)
+ raise
qlslibs.err.InvalidPartitionDirectoryNameError(arg_partition)
if self.current_efp_partition is not None:
pool_list = self.current_efp_partition.efp_pools.keys()
efp_directory_name =
EmptyFilePool.get_directory_name(int(arg_file_size))
if arg_add and efp_directory_name in pool_list:
- raise
qls.err.PoolDirectoryAlreadyExistsError(efp_directory_name)
+ raise
qlslibs.err.PoolDirectoryAlreadyExistsError(efp_directory_name)
if (arg_remove or arg_freshen) and efp_directory_name not in
pool_list:
- raise
qls.err.PoolDirectoryDoesNotExistError(efp_directory_name)
+ raise
qlslibs.err.PoolDirectoryDoesNotExistError(efp_directory_name)
class EfpPartition(object):
"""
@@ -197,17 +197,18 @@ class EfpPartition(object):
self.efp_pools[dir_entry] = efp
def _validate_partition_directory(self, disk_space_required_kb):
if os.path.basename(self.directory)[0] is not
EfpPartition.PTN_DIR_PREFIX:
- raise qls.err.InvalidPartitionDirectoryNameError(self.directory)
+ raise
qlslibs.err.InvalidPartitionDirectoryNameError(self.directory)
try:
self.partition_number = int(os.path.basename(self.directory)[1:])
except ValueError:
- raise qls.err.InvalidPartitionDirectoryNameError(self.directory)
- if not qls.utils.has_write_permission(self.directory):
- raise qls.err.WritePermissionError(self.directory)
+ raise
qlslibs.err.InvalidPartitionDirectoryNameError(self.directory)
+ if not qlslibs.utils.has_write_permission(self.directory):
+ raise qlslibs.err.WritePermissionError(self.directory)
if disk_space_required_kb is not None:
- space_avail = qls.utils.get_avail_disk_space(self.directory)
+ space_avail = qlslibs.utils.get_avail_disk_space(self.directory)
if space_avail < (disk_space_required_kb * 1024):
- raise qls.err.InsufficientSpaceOnDiskError(self.directory,
space_avail, disk_space_required_kb * 1024)
+ raise qlslibs.err.InsufficientSpaceOnDiskError(self.directory,
space_avail,
+
disk_space_required_kb * 1024)
class EmptyFilePool(object):
"""
@@ -258,14 +259,15 @@ class EmptyFilePool(object):
def _create_new_efp_file(self):
""" Create a single new empty journal file of the prescribed size for
this EFP """
file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION
- file_header = qls.jrnl.FileHeader(0, qls.jrnl.FileHeader.MAGIC,
qls.utils.DEFAULT_RECORD_VERSION, 0, 0, 0)
- file_header.init(None, None, qls.utils.DEFAULT_HEADER_SIZE_SBLKS,
self.partition_number, self.data_size_kb,
+ file_header = qlslibs.jrnl.FileHeader(0,
qlslibs.jrnl.FileHeader.MAGIC, qlslibs.utils.DEFAULT_RECORD_VERSION,
+ 0, 0, 0)
+ file_header.init(None, None, qlslibs.utils.DEFAULT_HEADER_SIZE_SBLKS,
self.partition_number, self.data_size_kb,
0, 0, 0, 0, 0)
efh = file_header.encode()
efh_bytes = len(efh)
file_handle = open(os.path.join(self.directory, file_name), 'wb')
file_handle.write(efh)
- file_handle.write('\xff' * (qls.utils.DEFAULT_SBLK_SIZE - efh_bytes))
+ file_handle.write('\xff' * (qlslibs.utils.DEFAULT_SBLK_SIZE -
efh_bytes))
file_handle.write('\x00' * (int(self.data_size_kb) * 1024))
file_handle.close()
fqfn = os.path.join(self.directory, file_name)
@@ -273,22 +275,22 @@ class EmptyFilePool(object):
return os.path.getsize(fqfn)
def _validate_efp_directory(self):
if self.base_dir_name[-1] is not EmptyFilePool.EFP_DIR_SUFFIX:
- raise qls.err.InvalidEfpDirectoryNameError(self.directory)
+ raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory)
try:
self.data_size_kb = int(os.path.basename(self.base_dir_name)[:-1])
except ValueError:
- raise qls.err.InvalidEfpDirectoryNameError(self.directory)
+ raise qlslibs.err.InvalidEfpDirectoryNameError(self.directory)
def _validate_efp_file(self, efp_file):
file_size = os.path.getsize(efp_file)
- expected_file_size = (self.data_size_kb * 1024) +
qls.utils.DEFAULT_SBLK_SIZE
+ expected_file_size = (self.data_size_kb * 1024) +
qlslibs.utils.DEFAULT_SBLK_SIZE
if file_size != expected_file_size:
print 'WARNING: File %s not of correct size (size=%d,
expected=%d): Ignoring' % (efp_file, file_size,
expected_file_size)
return False
file_handle = open(efp_file)
- args = qls.utils.load_args(file_handle, qls.jrnl.RecordHeader)
- file_hdr = qls.jrnl.FileHeader(*args)
- file_hdr.init(file_handle, *qls.utils.load_args(file_handle,
qls.jrnl.FileHeader))
+ args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader)
+ file_hdr = qlslibs.jrnl.FileHeader(*args)
+ file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle,
qlslibs.jrnl.FileHeader))
if not file_hdr.is_header_valid(file_hdr):
file_handle.close()
return False
Modified: qpid/trunk/qpid/tools/src/py/qlslibs/err.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/err.py?rev=1596633&r1=1592994&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/err.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/err.py Wed May 21 17:33:51 2014
@@ -18,7 +18,7 @@
#
"""
-Module: qls.err
+Module: qlslibs.err
Contains error classes.
"""
Modified: qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py?rev=1596633&r1=1592994&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/jrnl.py Wed May 21 17:33:51 2014
@@ -18,13 +18,13 @@
#
"""
-Module: qls.jrnl
+Module: qlslibs.jrnl
Contains journal record classes.
"""
-import qls.err
-import qls.utils
+import qlslibs.err
+import qlslibs.utils
import string
import struct
import time
@@ -58,8 +58,8 @@ class RecordHeader(object):
if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd',
'e', 'f', 'x']:
return False
if self.magic[-1] != 'x':
- if self.version != qls.utils.DEFAULT_RECORD_VERSION:
- raise qls.err.InvalidRecordVersionError(file_header, self,
qls.utils.DEFAULT_RECORD_VERSION)
+ if self.version != qlslibs.utils.DEFAULT_RECORD_VERSION:
+ raise qlslibs.err.InvalidRecordVersionError(file_header, self,
qlslibs.utils.DEFAULT_RECORD_VERSION)
if self.serial != file_header.serial:
return False
return True
@@ -106,17 +106,17 @@ class RecordTail(object):
if self.valid_flag is None:
if not self.complete:
return False
- self.valid_flag = qls.utils.inv_str(self.xmagic) == record.magic
and \
+ self.valid_flag = qlslibs.utils.inv_str(self.xmagic) ==
record.magic and \
self.serial == record.serial and \
self.record_id == record.record_id and \
- qls.utils.adler32(record.checksum_encode()) ==
self.checksum
+ qlslibs.utils.adler32(record.checksum_encode())
== self.checksum
return self.valid_flag
def to_string(self):
"""Return a string representation of the this RecordTail instance"""
if self.valid_flag is not None:
if not self.valid_flag:
return '[INVALID RECORD TAIL]'
- magic = qls.utils.inv_str(self.xmagic)
+ magic = qlslibs.utils.inv_str(self.xmagic)
magic_char = magic[-1].upper() if magic[-1] in string.printable else
'?'
return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum,
self.record_id)
def __str__(self):
@@ -150,7 +150,7 @@ class FileHeader(RecordHeader):
self.queue_name_len) +
self.queue_name
def get_file_size(self):
"""Sum of file header size and data size"""
- return (self.file_header_size_sblks * qls.utils.DEFAULT_SBLK_SIZE) +
(self.efp_data_size_kb * 1024)
+ return (self.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE)
+ (self.efp_data_size_kb * 1024)
def load(self, file_handle):
self.queue_name = file_handle.read(self.queue_name_len)
def is_end_of_file(self):
@@ -225,13 +225,13 @@ class EnqueueRecord(RecordHeader):
return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with
new file handle"""
- self.xid, self.xid_complete = qls.utils.load_data(file_handle,
self.xid, self.xid_size)
+ self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle,
self.xid, self.xid_size)
if not self.xid_complete:
return True
if self.is_external():
self.data_complete = True
else:
- self.data, self.data_complete = qls.utils.load_data(file_handle,
self.data, self.data_size)
+ self.data, self.data_complete =
qlslibs.utils.load_data(file_handle, self.data, self.data_size)
if not self.data_complete:
return True
if self.xid_size > 0 or self.data_size > 0:
@@ -254,8 +254,8 @@ class EnqueueRecord(RecordHeader):
else:
record_tail_str = self.record_tail.to_string()
return '%s %s %s %s %s %s' % (self.to_rh_string(),
- qls.utils.format_xid(self.xid,
self.xid_size, show_xid_flag),
- qls.utils.format_data(self.data,
self.data_size, show_data_flag),
+ qlslibs.utils.format_xid(self.xid,
self.xid_size, show_xid_flag),
+ qlslibs.utils.format_data(self.data,
self.data_size, show_data_flag),
record_tail_str, self._print_flags(),
self._get_warnings())
def _print_flags(self):
"""Utility function to decode the flags field in the header and print
a string representation"""
@@ -305,7 +305,7 @@ class DequeueRecord(RecordHeader):
return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with
new file handle"""
- self.xid, self.xid_complete = qls.utils.load_data(file_handle,
self.xid, self.xid_size)
+ self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle,
self.xid, self.xid_size)
if not self.xid_complete:
return True
if self.xid_size > 0:
@@ -329,7 +329,7 @@ class DequeueRecord(RecordHeader):
else:
record_tail_str = self.record_tail.to_string()
return '%s drid=0x%x %s %s %s %s' % (self.to_rh_string(),
self.dequeue_record_id,
- qls.utils.format_xid(self.xid,
self.xid_size, show_xid_flag),
+
qlslibs.utils.format_xid(self.xid, self.xid_size, show_xid_flag),
record_tail_str,
self._print_flags(), self._get_warnings())
def _print_flags(self):
"""Utility function to decode the flags field in the header and print
a string representation"""
@@ -366,7 +366,7 @@ class TransactionRecord(RecordHeader):
return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with
new file handle"""
- self.xid, self.xid_complete = qls.utils.load_data(file_handle,
self.xid, self.xid_size)
+ self.xid, self.xid_complete = qlslibs.utils.load_data(file_handle,
self.xid, self.xid_size)
if not self.xid_complete:
return True
if self.xid_size > 0:
@@ -388,7 +388,7 @@ class TransactionRecord(RecordHeader):
else:
record_tail_str = self.record_tail.to_string()
return '%s %s %s %s' % (self.to_rh_string(),
- qls.utils.format_xid(self.xid, self.xid_size,
show_xid_flag),
+ qlslibs.utils.format_xid(self.xid,
self.xid_size, show_xid_flag),
record_tail_str, self._get_warnings())
def __str__(self):
"""Return a string representation of the this TransactionRecord
instance"""
Modified: qpid/trunk/qpid/tools/src/py/qlslibs/utils.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qlslibs/utils.py?rev=1596633&r1=1592994&r2=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qlslibs/utils.py (original)
+++ qpid/trunk/qpid/tools/src/py/qlslibs/utils.py Wed May 21 17:33:51 2014
@@ -18,13 +18,13 @@
#
"""
-Module: qls.utils
+Module: qlslibs.utils
Contains helper functions for qpid_qls_analyze.
"""
import os
-import qls.jrnl
+import qlslibs.jrnl
import stat
import string
import struct
@@ -43,18 +43,18 @@ def adler32(data):
def create_record(magic, uflags, journal_file, record_id, dequeue_record_id,
xid, data):
"""Helper function to construct a record with xid, data (where applicable)
and consistent tail with checksum"""
- record_class = qls.jrnl.CLASSES.get(magic[-1])
+ record_class = qlslibs.jrnl.CLASSES.get(magic[-1])
record = record_class(0, magic, DEFAULT_RECORD_VERSION, uflags,
journal_file.file_header.serial, record_id)
xid_length = len(xid) if xid is not None else 0
- if isinstance(record, qls.jrnl.EnqueueRecord):
+ if isinstance(record, qlslibs.jrnl.EnqueueRecord):
data_length = len(data) if data is not None else 0
record.init(None, xid_length, data_length)
- elif isinstance(record, qls.jrnl.DequeueRecord):
+ elif isinstance(record, qlslibs.jrnl.DequeueRecord):
record.init(None, dequeue_record_id, xid_length)
- elif isinstance(record, qls.jrnl.TransactionRecord):
+ elif isinstance(record, qlslibs.jrnl.TransactionRecord):
record.init(None, xid_length)
else:
- raise qls.err.InvalidClassError(record.__class__.__name__)
+ raise qlslibs.err.InvalidClassError(record.__class__.__name__)
if xid is not None:
record.xid = xid
record.xid_complete = True
@@ -75,11 +75,11 @@ def efp_directory_size(directory_name):
def format_data(data, data_size=None, show_data_flag=True):
"""Format binary data for printing"""
- return _format_binary(data, data_size, show_data_flag, 'data',
qls.err.DataSizeError, False)
+ return _format_binary(data, data_size, show_data_flag, 'data',
qlslibs.err.DataSizeError, False)
def format_xid(xid, xid_size=None, show_xid_flag=True):
"""Format binary XID for printing"""
- return _format_binary(xid, xid_size, show_xid_flag, 'xid',
qls.err.XidSizeError, True)
+ return _format_binary(xid, xid_size, show_xid_flag, 'xid',
qlslibs.err.XidSizeError, True)
def get_avail_disk_space(path):
df_proc = subprocess.Popen(["df", path], stdout=subprocess.PIPE)
@@ -112,7 +112,7 @@ def load_args(file_handle, klass):
foffs = file_handle.tell(),
fbin = file_handle.read(size)
if len(fbin) != size:
- raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs,
file_handle.name)
+ raise qlslibs.err.UnexpectedEndOfFileError(len(fbin), size, foffs,
file_handle.name)
return foffs + struct.unpack(klass.FORMAT, fbin)
def load_data(file_handle, element, element_size):
@@ -179,7 +179,7 @@ def _is_printable(in_str):
return True
def _mk_record_tail(record):
- record_tail = qls.jrnl.RecordTail(None)
+ record_tail = qlslibs.jrnl.RecordTail(None)
record_tail.xmagic = inv_str(record.magic)
record_tail.checksum = adler32(record.checksum_encode())
record_tail.serial = record.serial
Copied: qpid/trunk/qpid/tools/src/py/qpid-qls-analyze (from r1592994,
qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-qls-analyze?p2=qpid/trunk/qpid/tools/src/py/qpid-qls-analyze&p1=qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py&r1=1592994&r2=1596633&rev=1596633&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-qls-analyze Wed May 21 17:33:51 2014
@@ -23,15 +23,24 @@ qpid-qls-analyze
Reads and analyzes a Qpid Linear Store (QLS) store directory.
"""
+import os.path
+import sys
+
+default = os.path.normpath('/usr/share/qpid-tools')
+home = os.environ.get('QPID_TOOLS_HOME', default)
+sys.path.append(os.path.join(home,'python'))
+
import argparse
import os
-import os.path
-import qls.anal
-import qls.efp
+import qlslibs.anal
+import qlslibs.efp
class QlsAnalyzerArgParser(argparse.ArgumentParser):
+ """
+ Class to handle command-line arguments.
+ """
def __init__(self):
- argparse.ArgumentParser.__init__(self, description = 'Qpid Linear
Store Analyzer', prog = 'qpid-qls-analyze')
+ argparse.ArgumentParser.__init__(self, description='Qpid Linear Store
Analyzer', prog='qpid-qls-analyze')
self.add_argument('qls_dir', metavar='DIR',
help='Qpid Linear Store (QLS) directory to be
analyzed')
self.add_argument('--efp', action='store_true',
@@ -68,18 +77,21 @@ class QqpdLinearStoreAnalyzer(object):
self.args = None
self._process_args()
self.qls_dir = os.path.abspath(self.args.qls_dir)
- self.efp_manager = qls.efp.EfpManager(self.qls_dir, None)
- self.jrnl_recovery_mgr = qls.anal.JournalRecoveryManager(self.qls_dir,
self.args)
+ self.efp_manager = qlslibs.efp.EfpManager(self.qls_dir, None)
+ self.jrnl_recovery_mgr =
qlslibs.anal.JournalRecoveryManager(self.qls_dir, self.args)
def _process_args(self):
+ """ Create arg parser and process args """
parser = QlsAnalyzerArgParser()
self.args = parser.parse_args()
if not os.path.exists(self.args.qls_dir):
parser.error('Journal path "%s" does not exist' %
self.args.qls_dir)
def report(self):
+ """ Create a report on the linear store previously analyzed using
analyze() """
if self.args.efp:
self.efp_manager.report()
self.jrnl_recovery_mgr.report(self.args.stats)
def run(self):
+ """ Run the analyzer, which reads and analyzes the linear store """
if self.args.efp:
self.efp_manager.run(None)
self.jrnl_recovery_mgr.run()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]