Author: kpvdr
Date: Thu Jan  9 17:19:59 2014
New Revision: 1556888

URL: http://svn.apache.org/r1556888
Log:
QPID-5362: WIP: Linearstore: No store tools exist for examining the journals. 
This checkin is work-in-progress.

Added:
    qpid/trunk/qpid/tools/src/py/qls/
    qpid/trunk/qpid/tools/src/py/qls/__init__.py
    qpid/trunk/qpid/tools/src/py/qls/efp.py
    qpid/trunk/qpid/tools/src/py/qls/err.py
    qpid/trunk/qpid/tools/src/py/qls/jrnl.py
    qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py   (with props)

Added: qpid/trunk/qpid/tools/src/py/qls/__init__.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qls/__init__.py?rev=1556888&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qls/__init__.py (added)
+++ qpid/trunk/qpid/tools/src/py/qls/__init__.py Thu Jan  9 17:19:59 2014
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+

Added: qpid/trunk/qpid/tools/src/py/qls/efp.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qls/efp.py?rev=1556888&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qls/efp.py (added)
+++ qpid/trunk/qpid/tools/src/py/qls/efp.py Thu Jan  9 17:19:59 2014
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import os.path
+import qls.err
+
+class EfpManager(object):
+    """
+    Top level class to analyze the Qpid Linear Store (QLS) directory for the 
partitions that make up the
+    Empty File Pool (EFP).
+    """
+    def __init__(self, directory):
+        if not os.path.exists(directory):
+            raise qls.err.InvalidQlsDirectoryNameError(directory)
+        self.directory = directory
+        self.partitions = []
+    def report(self):
+        print 'Found', len(self.partitions), 'partition(s).'
+        if (len(self.partitions)) > 0:
+            EfpPartition.print_report_table_header()
+            for ptn in self.partitions:
+                ptn.print_report_table_line()
+            print
+            for ptn in self.partitions:
+                ptn.report()
+    def run(self, args):
+        for dir_entry in os.listdir(self.directory):
+            try:
+                efpp = EfpPartition(os.path.join(self.directory, dir_entry))
+                efpp.scan()
+                self.partitions.append(efpp)
+            except qls.err.InvalidPartitionDirectoryNameError:
+                pass
+
+class EfpPartition(object):
+    """
+    Class that represents a EFP partition. Each partition contains one or more 
Empty File Pools (EFPs).
+    """
+    PTN_DIR_PREFIX = 'p'
+    EFP_DIR_NAME = 'efp'
+    def __init__(self, directory):
+        self.base_dir = os.path.basename(directory)
+        if self.base_dir[0] is not EfpPartition.PTN_DIR_PREFIX:
+            raise qls.err.InvalidPartitionDirectoryNameError(directory)
+        try:
+            self.partition_number = int(self.base_dir[1:])
+        except ValueError:
+            raise qls.err.InvalidPartitionDirectoryNameError(directory)
+        self.directory = directory
+        self.pools = []
+        self.efp_count = 0
+        self.tot_file_count = 0
+        self.tot_file_size_kb = 0
+    def get_directory(self):
+        return self.directory
+    def get_efp_count(self):
+        return self.efp_count
+    def get_name(self):
+        return self.base_dir
+    def get_number(self):
+        return self.partition_number
+    def get_number_pools(self):
+        return len(self.pools)
+    def get_tot_file_count(self):
+        return self.tot_file_count
+    def get_tot_file_size_kb(self):
+        return self.tot_file_size_kb
+    @staticmethod
+    def print_report_table_header():
+        print 'p_no no_efp tot_files tot_size_kb directory'
+        print '---- ------ --------- ----------- ---------'
+    def print_report_table_line(self):
+        print '%4d %6d %9d %11d %s' % (self.get_number(), 
self.get_efp_count(), self.get_tot_file_count(),
+                                       self.get_tot_file_size_kb(), 
self.get_directory())
+    def report(self):
+        print 'Partition %s:' % self.base_dir
+        EmptyFilePool.print_report_table_header()
+        for pool in self.pools:
+            pool.print_report_table_line()
+        print
+    def scan(self):
+        if os.path.exists(self.directory):
+            efp_dir = os.path.join(self.directory, EfpPartition.EFP_DIR_NAME)
+            for dir_entry in os.listdir(efp_dir):
+                efp = EmptyFilePool(os.path.join(efp_dir, dir_entry))
+                self.efp_count += 1
+                self.tot_file_count += efp.get_tot_file_count()
+                self.tot_file_size_kb += efp.get_tot_file_size_kb()
+                self.pools.append(efp)
+
+class EmptyFilePool(object):
+    """
+    Class that represents a single Empty File Pool within a partition. Each 
EFP contains pre-formatted linear store
+    journal files (but it may also be empty).
+    """
+    EFP_DIR_SUFFIX = 'k'
+    def __init__(self, directory):
+        self.base_dir = os.path.basename(directory)
+        if self.base_dir[-1] is not EmptyFilePool.EFP_DIR_SUFFIX:
+            raise qls.err.InvalidEfpDirectoryNameError(directory)
+        try:
+            self.data_size_kb = int(os.path.basename(self.base_dir)[:-1])
+        except ValueError:
+            raise qls.err.InvalidEfpDirectoryNameError(directory)
+        self.directory = directory
+        self.files = os.listdir(directory)
+    def get_directory(self):
+        return self.directory
+    def get_file_data_size_kb(self):
+        return self.data_size_kb
+    def get_name(self):
+        return self.base_dir
+    def get_tot_file_count(self):
+        return len(self.files)
+    def get_tot_file_size_kb(self):
+        return self.data_size_kb * len(self.files)
+    @staticmethod
+    def print_report_table_header():
+        print 'file_size_kb file_count tot_file_size_kb efp_directory'
+        print '------------ ---------- ---------------- -------------'
+    def print_report_table_line(self):
+        print '%12d %10d %16d %s' % (self.get_file_data_size_kb(), 
self.get_tot_file_count(),
+                                     self.get_tot_file_size_kb(), 
self.get_directory())
+
+# =============================================================================
+
+if __name__ == "__main__":
+    print "This is a library, and cannot be executed."

Added: qpid/trunk/qpid/tools/src/py/qls/err.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qls/err.py?rev=1556888&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qls/err.py (added)
+++ qpid/trunk/qpid/tools/src/py/qls/err.py Thu Jan  9 17:19:59 2014
@@ -0,0 +1,177 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# --- Parent classes
+
+class QlsError(Exception):
+    """Base error class for QLS errors and exceptions"""
+    def __init__(self):
+        Exception.__init__(self)
+
+class QlsRecordError(QlsError):
+    """Base error class for individual records"""
+    def __init__(self, file_header, record):
+        QlsError.__init__(self)
+        self.file_header = file_header
+        self.record = record
+    def __str__(self):
+        return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \
+            (self.file_header.queue_name, self.file_header.file_num, 
self.record.file_offset, self.record.record_id)
+
+# --- Error classes
+
+class AlreadyLockedError(QlsRecordError):
+    """Transactional record to be locked is already locked"""
+    def __init__(self, file_header, record):
+        QlsRecordError.__init__(self, file_header, record)
+    def __str__(self):
+        return 'Transactional operation already locked in TransactionMap: ' + 
QlsRecordError.__str__(self)
+
+class DatqaSizeError(QlsError):
+    """Error class for Data size mismatch"""
+    def __init__(self, expected_size, actual_size, data_str):
+        QlsError.__init__(self)
+        self.expected_size = expected_size
+        self.actual_size = actual_size
+        self.xid_str = data_str
+    def __str__(self):
+        return 'Inconsistent data size: expected:%d; actual:%d; data="%s"' % \
+            (self.expected_size, self.actual_size, self.data_str)
+
+class DuplicateRecordIdError(QlsRecordError):
+    """Duplicate Record Id in Enqueue Map"""
+    def __init__(self, file_header, record):
+        QlsRecordError.__init__(self, file_header, record)
+    def __str__(self):
+        return 'Duplicate Record Id in enqueue map: ' + 
QlsRecordError.__str__(self)
+
+class ExternalDataError(QlsRecordError):
+    """Data present in Enqueue record when external data flag is set"""
+    def __init__(self, file_header, record):
+        QlsRecordError.__init__(self, file_header, record)
+    def __str__(self):
+        return 'Data present in external data record: ' + 
QlsRecordError.__str__(self)
+
+class FirstRecordOffsetMismatchError(QlsRecordError):
+    """First Record Offset (FRO) does not match file header"""
+    def __init__(self, file_header, record):
+        QlsRecordError.__init__(self, file_header, record)
+    def __str__(self):
+        return 'First record offset mismatch: ' + QlsRecordError.__str__(self) 
+ ' expected_offset=0x%x' % \
+            self.file_header.first_record_offset
+
+class InvalidEfpDirectoryNameError(QlsError):
+    """Invalid EFP directory name - should be NNNNk, where NNNN is a number 
(of any length)"""
+    def __init__(self, directory_name):
+        QlsError.__init__(self)
+        self.directory_name = directory_name
+    def __str__(self):
+        return 'Invalid EFP directory name "%s"' % self.directory_name
+
+class InvalidPartitionDirectoryNameError(QlsError):
+    """Invalid EFP partition name - should be pNNN, where NNN is a 3-digit 
partition number"""
+    def __init__(self, directory_name):
+        QlsError.__init__(self)
+        self.directory_name = directory_name
+    def __str__(self):
+        return 'Invalid partition directory name "%s"' % self.directory_name
+
+class InvalidQlsDirectoryNameError(QlsError):
+    """Invalid QLS directory name"""
+    def __init__(self, directory_name):
+        QlsError.__init__(self)
+        self.directory_name = directory_name
+    def __str__(self):
+        return 'Invalid QLS directory name "%s"' % self.directory_name
+
+class InvalidRecordTypeError(QlsRecordError):
+    """Error class for any operation using an invalid record type"""
+    def __init__(self, file_header, record, error_msg):
+        QlsRecordError.__init__(self, file_header, record)
+        self.error_msg = error_msg
+    def __str__(self):
+        return 'Invalid record type: ' + QlsRecordError.__str__(self) + ':' + 
self.error_msg
+
+class InvalidRecordVersionError(QlsRecordError):
+    """Invalid record version"""
+    def __init__(self, file_header, record, expected_version):
+        QlsRecordError.__init__(self, file_header, record)
+        self.expected_version = expected_version
+    def __str__(self):
+        return 'Invalid record version: queue="%s" ' + 
QlsRecordError.__str__(self) + \
+            ' ver_found=0x%x ver_expected=0x%x' % (self.record_header.version, 
self.expected_version)
+
+class NoMoreFilesInJournalError(QlsError):
+    """Raised when trying to obtain the next file in the journal and there are 
no more files"""
+    def __init__(self, queue_name):
+        QlsError.__init__(self)
+        self.queue_name = queue_name
+    def __str__(self):
+        return 'No more journal files in queue "%s"' % self.queue_name
+
+class NonTransactionalRecordError(QlsRecordError):
+    """Transactional operation on non-transactional record"""
+    def __init__(self, file_header, record, operation):
+        QlsRecordError.__init__(self, file_header, record)
+        self.operation = operation
+    def __str__(self):
+        return 'Transactional operation on non-transactional record: ' + 
QlsRecordError.__str__() + \
+            ' operation=%s' % self.operation
+
+class RecordIdNotFoundError(QlsRecordError):
+    """Record Id not found in enqueue map"""
+    def __init__(self, file_header, record):
+        QlsRecordError.__init__(self, file_header, record)
+    def __str__(self):
+        return 'Record Id not found in enqueue map: ' + 
QlsRecordError.__str__()
+
+class RecordNotLockedError(QlsRecordError):
+    """Record in enqueue map is not locked"""
+    def __init__(self, file_header, record):
+        QlsRecordError.__init__(self, file_header, record)
+    def __str__(self):
+        return 'Record in enqueue map is not locked: ' + 
QlsRecordError.__str__()
+
+class UnexpectedEndOfFileError(QlsError):
+    """The bytes read from a file is less than that expected"""
+    def __init__(self, size_read, size_expected, file_offset, file_name):
+        QlsError.__init__(self)
+        self.size_read = size_read
+        self.size_expected = size_expected
+        self.file_offset = file_offset
+        self.file_name = file_name
+    def __str__(self):
+        return 'Tried to read %d at offset %d in file "%s"; only read %d' % \
+            (self.size_read, self.file_offset, self.file_name, 
self.size_expected)
+
+class XidSizeError(QlsError):
+    """Error class for Xid size mismatch"""
+    def __init__(self, expected_size, actual_size, xid_str):
+        QlsError.__init__(self)
+        self.expected_size = expected_size
+        self.actual_size = actual_size
+        self.xid_str = xid_str
+    def __str__(self):
+        return 'Inconsistent xid size: expected:%d; actual:%d; xid="%s"' % \
+            (self.expected_size, self.actual_size, self.xid_str)
+
+# =============================================================================
+
+if __name__ == "__main__":
+    print "This is a library, and cannot be executed."

Added: qpid/trunk/qpid/tools/src/py/qls/jrnl.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qls/jrnl.py?rev=1556888&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qls/jrnl.py (added)
+++ qpid/trunk/qpid/tools/src/py/qls/jrnl.py Thu Jan  9 17:19:59 2014
@@ -0,0 +1,762 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import os.path
+import qls.err
+import struct
+from time import gmtime, strftime
+
+class HighCounter(object):
+    def __init__(self):
+        self.num = 0
+    def check(self, num):
+        if self.num < num:
+            self.num = num
+    def get(self):
+        return self.num
+    def get_next(self):
+        self.num += 1
+        return self.num
+
+class JournalRecoveryManager(object):
+    TPL_DIR_NAME = 'tpl'
+    JRNL_DIR_NAME = 'jrnl'
+    def __init__(self, directory):
+        if not os.path.exists(directory):
+            raise qls.err.InvalidQlsDirectoryNameError(directory)
+        self.directory = directory
+        self.tpl = None
+        self.journals = {}
+        self.high_rid_counter = HighCounter()
+    def report(self, print_stats_flag):
+        if not self.tpl is None:
+            self.tpl.report(print_stats_flag)
+        for queue_name in sorted(self.journals.keys()):
+            self.journals[queue_name].report(print_stats_flag)
+    def run(self, args):
+        tpl_dir = os.path.join(self.directory, 
JournalRecoveryManager.TPL_DIR_NAME)
+        if os.path.exists(tpl_dir):
+            self.tpl = Journal(os.path.join(self.directory, tpl_dir), None)
+            self.tpl.recover(self.high_rid_counter)
+            print
+        jrnl_dir = os.path.join(self.directory, 
JournalRecoveryManager.JRNL_DIR_NAME)
+        if os.path.exists(jrnl_dir):
+            for dir_entry in os.listdir(jrnl_dir):
+                jrnl = Journal(os.path.join(jrnl_dir, dir_entry), 
self.tpl.get_outstanding_transaction_list())
+                jrnl.recover(self.high_rid_counter)
+                self.journals[jrnl.get_queue_name()] = jrnl
+                if args.txn:
+                    jrnl.reconcile_transactions(self.high_rid_counter)
+                print
+
+class EnqueueMap(object):
+    """
+    Map of enqueued records in a QLS journal
+    """
+    def __init__(self, journal):
+        self.journal = journal
+        self.enq_map = {}
+    def add(self, file_header, enq_record, locked_flag):
+        if enq_record.record_id in self.enq_map:
+            raise 
qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
+        self.enq_map[enq_record.record_id] = [file_header.file_num, 
enq_record, locked_flag]
+    def contains(self, rid):
+        """Return True if the map contains the given rid"""
+        return rid in self.enq_map
+    def delete(self, file_header, deq_record):
+        if deq_record.dequeue_record_id in self.enq_map:
+            del self.enq_map[deq_record.dequeue_record_id]
+        else:
+            raise qls.err.RecordIdNotFoundError(file_header, deq_record)
+    def lock(self, file_header, dequeue_record):
+        if not dequeue_record.dequeue_record_id in self.enq_map:
+            raise qls.err.RecordIdNotFoundError(file_header, dequeue_record)
+        self.enq_map[dequeue_record.dequeue_record_id][2] = True
+    def report_str(self, show_stats, show_records):
+        """Return a string containing a text report for all records in the 
map"""
+        if len(self.enq_map) == 0:
+            return 'No enqueued records found.'
+        rstr = '%d enqueued records found' % len(self.enq_map)
+        if show_records:
+            rstr += ":"
+            rid_list = self.enq_map.keys()
+            rid_list.sort()
+            for rid in rid_list:
+                file_num, record, locked_flag = self.enq_map[rid]
+                if locked_flag:
+                    lock_str = '[LOCKED]'
+                else:
+                    lock_str = ''
+                rstr += '\n  %d:%s %s' % (file_num, record, lock_str)
+        else:
+            rstr += '.'
+        return rstr
+    def unlock(self, file_header, dequeue_record):
+        """Set the transaction lock for a given record_id to False"""
+        if dequeue_record.dequeue_record_id in self.enq_map:
+            if self.enq_map[dequeue_record.dequeue_record_id][2]:
+                self.enq_map[dequeue_record.dequeue_record_id][2] = False
+            else:
+                raise qls.err.RecordNotLockedError(file_header, dequeue_record)
+        else:
+            raise qls.err.RecordIdNotFoundError(file_header, dequeue_record)
+
+class TransactionMap(object):
+    """
+    Map of open transactions used while recovering a QLS journal
+    """
+    def __init__(self, enq_map):
+        self.txn_map = {}
+        self.enq_map = enq_map
+    def abort(self, xid):
+        """Perform an abort operation for the given xid record"""
+        for file_header, record, lock_flag in self.txn_map[xid]:
+            if isinstance(record, DequeueRecord):
+                if self.enq_map.contains(record.dequeue_record_id):
+                    self.enq_map.unlock(file_header, record)
+        del self.txn_map[xid]
+    def add(self, file_header, record):
+        if record.xid is None:
+            raise qls.err.NonTransactionalRecordError(file_header, record, 
'TransactionMap.add()')
+        if isinstance(record, DequeueRecord):
+            try:
+                self.enq_map.lock(file_header, record)
+            except qls.err.RecordIdNotFoundError:
+                # Not in emap, look for rid in tmap - should not happen in 
practice
+                txn_op = self._find_record_id(record.xid, 
record.dequeue_record_id)
+                if txn_op != None:
+                    if txn_op[2]:
+                        raise qls.err.AlreadyLockedError(file_header, record)
+                    txn_op[2] = True
+        if record.xid in self.txn_map:
+            self.txn_map[record.xid].append([file_header, record, False]) # 
append to existing list
+        else:
+            self.txn_map[record.xid] = [[file_header, record, False]] # create 
new list
+    def commit(self, xid):
+        """Perform a commit operation for the given xid record"""
+        mismatch_list = []
+        for file_header, record, lock in self.txn_map[xid]:
+            if isinstance(record, EnqueueRecord):
+                self.enq_map.add(file_header, record, lock) # Transfer enq to 
emap
+            else:
+                if self.enq_map.contains(record.dequeue_record_id):
+                    self.enq_map.unlock(file_header, record)
+                    self.enq_map.delete(file_header, record)
+                else:
+                    mismatch_list.append('0x%x' % record.dequeue_record_id)
+        del self.txn_map[xid]
+        return mismatch_list
+    def contains(self, xid):
+        """Return True if the xid exists in the map; False otherwise"""
+        return xid in self.txn_map
+    def delete(self, file_header, transaction_record):
+        """Remove a transaction record from the map using either a commit or 
abort header"""
+        if transaction_record.magic[-1] == 'c':
+            return self.commit(transaction_record.xid)
+        if transaction_record.magic[-1] == 'a':
+            self.abort(transaction_record.xid)
+        else:
+            raise qls.err.InvalidRecordTypeError(file_header, 
transaction_record, 'delete from Transaction Map')
+    def get_xid_list(self):
+        return self.txn_map.keys()
+    def report_str(self, show_stats, show_records):
+        """Return a string containing a text report for all records in the 
map"""
+        if len(self.txn_map) == 0:
+            return 'No outstanding transactions found.'
+        rstr = '%d outstanding transaction(s)' % len(self.txn_map)
+        if show_records:
+            rstr += ':'
+            for xid, list in self.txn_map.iteritems():
+                rstr += '\n  %s containing %d operations:' % 
(Utils.format_xid(xid), len(list))
+                for file_header, record, locked_flag in list:
+                    rstr += '\n    %d:%s' % (file_header.file_num, record)
+        else:
+            rstr += '.'
+        return rstr
+    def _find_record_id(self, xid, record_id):
+        """ Search for and return map list with supplied rid."""
+        if xid in self.txn_map:
+            for txn_op in self.txn_map[xid]:
+                if txn_op[1].record_id == record_id:
+                    return txn_op
+        for this_xid in self.txn_map.iterkeys():
+            for txn_op in self.txn_map[this_xid]:
+                if txn_op[1].record_id == record_id:
+                    return txn_op
+        return None
+
+class Journal(object):
+    """
+    Instance of a Qpid Linear Store (QLS) journal.
+    """
+    class JournalStatistics(object):
+        """Journal statistics"""
+        def __init__(self):
+            self.total_record_count = 0
+            self.transient_record_count = 0
+            self.filler_record_count = 0
+            self.enqueue_count = 0
+            self.dequeue_count = 0
+            self.transaction_record_count = 0
+            self.transaction_enqueue_count = 0
+            self.transaction_dequeue_count = 0
+            self.transaction_commit_count = 0
+            self.transaction_abort_count = 0
+            self.transaction_operation_count = 0
+        def __str__(self):
+            fstr = 'Total record count: %d\n' + \
+                   'Transient record count: %d\n' + \
+                   'Filler_record_count: %d\n' + \
+                   'Enqueue_count: %d\n' + \
+                   'Dequeue_count: %d\n' + \
+                   'Transaction_record_count: %d\n' + \
+                   'Transaction_enqueue_count: %d\n' + \
+                   'Transaction_dequeue_count: %d\n' + \
+                   'Transaction_commit_count: %d\n' + \
+                   'Transaction_abort_count: %d\n' + \
+                   'Transaction_operation_count: %d\n'
+            return fstr % (self.total_record_count,
+                           self.transient_record_count,
+                           self.filler_record_count,
+                           self.enqueue_count,
+                           self.dequeue_count,
+                           self.transaction_record_count,
+                           self.transaction_enqueue_count,
+                           self.transaction_dequeue_count,
+                           self.transaction_commit_count,
+                           self.transaction_abort_count,
+                           self.transaction_operation_count)
+
+    def __init__(self, directory, xid_prepared_list):
+        self.directory = directory
+        self.queue_name = os.path.basename(directory)
+        self.files = {}
+        self.enq_map = EnqueueMap(self)
+        self.txn_map = TransactionMap(self.enq_map)
+        self.file_itr = None
+        self.current_file_header = None
+        self.first_rec_flag = None
+        self.statistics = Journal.JournalStatistics()
+        self.warnings = []
+        self.xid_prepared_list = xid_prepared_list # This is None for the TPL 
instance only
+    def get_outstanding_transaction_list(self):
+        return self.txn_map.get_xid_list()
+    def get_queue_name(self):
+        return self.queue_name
+    def recover(self, high_rid_counter):
+        print 'Recovering', self.queue_name #DEBUG
+        self._analyze_files()
+        try:
+            while self._get_next_record(high_rid_counter):
+                pass
+        except qls.err.NoMoreFilesInJournalError:
+            #print '[No more files in journal]' # DEBUG
+            #print #DEBUG
+            pass
+    def reconcile_transactions(self, high_rid_counter):
+        if not self.xid_prepared_list is None: # ie This is not the TPL 
instance
+            print 'Reconcile outstanding prepared transactions:'
+            for xid in self.txn_map.get_xid_list():
+                if xid in self.xid_prepared_list:
+                    print '  Committing', Utils.format_xid(xid)
+                    self.txn_map.commit(xid)
+                else:
+                    print '  Aborting', Utils.format_xid(xid)
+                    self.txn_map.abort(xid)
+    def report(self, print_stats_flag):
+        print 'Journal "%s":' % self.queue_name
+        if print_stats_flag:
+            print str(self.statistics)
+        print self.enq_map.report_str(True, True)
+        print self.txn_map.report_str(True, True)
+        print 'file_num p_no   efp journal_file'
+        print '-------- ---- ----- ------------'
+        for file_num, file_hdr in self.files.iteritems():
+            comment = '<uninitialized>' if file_hdr.file_num == 0 else ''
+            print '%8d %4d %4dk %s %s' % (file_num, file_hdr.partition_num, 
file_hdr.efp_data_size_kb,
+                                          
os.path.basename(file_hdr.file_handle.name), comment)
+        print
+    #--- protected functions ---
+    def _analyze_files(self):
+        for dir_entry in os.listdir(self.directory):
+            dir_entry_bits = dir_entry.split('.')
+            if len(dir_entry_bits) == 2 and dir_entry_bits[1] == 
JournalRecoveryManager.JRNL_DIR_NAME:
+                fq_file_name = os.path.join(self.directory, dir_entry)
+                file_handle = open(fq_file_name)
+                args = Utils.load_args(file_handle, RecordHeader)
+                file_hdr = FileHeader(*args)
+                file_hdr.init(file_handle, *Utils.load_args(file_handle, 
FileHeader))
+                if not file_hdr.is_valid(file_hdr):
+                    break
+                file_hdr.load(file_handle)
+                Utils.skip(file_handle, file_hdr.file_header_size_sblks * 
Utils.SBLK_SIZE)
+                self.files[file_hdr.file_num] = file_hdr
+        self.file_itr = iter(self.files)
+    def _check_file(self):
+        if not self.current_file_header is None and not 
self.current_file_header.is_end_of_file():
+            return
+        self._get_next_file()
+        
self.current_file_header.file_handle.seek(self.current_file_header.first_record_offset)
+    def _get_next_file(self):
+        if not self.current_file_header is None:
+            if not self.current_file_header.file_handle.closed: # sanity 
check, should not be necessary
+                self.current_file_header.file_handle.close()
+        file_num = 0
+        try:
+            while file_num == 0:
+                file_num = self.file_itr.next()
+        except StopIteration:
+            pass
+        if file_num == 0:
+            raise qls.err.NoMoreFilesInJournalError(self.queue_name)
+        self.current_file_header = self.files[file_num]
+        self.first_rec_flag = True
+        print self.current_file_header
+        #print '[file_num=0x%x]' % self.current_file_header.file_num #DEBUG
+    def _get_next_record(self, high_rid_counter):
+        self._check_file()
+        this_record = Utils.load(self.current_file_header.file_handle, 
RecordHeader)
+        if not this_record.is_valid(self.current_file_header):
+            return False
+        if self.first_rec_flag:
+            if this_record.file_offset != 
self.current_file_header.first_record_offset:
+                raise 
qls.err.FirstRecordOffsetMismatchError(self.current_file_header, this_record)
+            self.first_rec_flag = False
+        high_rid_counter.check(this_record.record_id)
+        self.statistics.total_record_count += 1
+        if isinstance(this_record, EnqueueRecord):
+            self._handle_enqueue_record(this_record)
+            print this_record
+        elif isinstance(this_record, DequeueRecord):
+            self._handle_dequeue_record(this_record)
+            print this_record
+        elif isinstance(this_record, TransactionRecord):
+            self._handle_transaction_record(this_record)
+            print this_record
+        else:
+            self.statistics.filler_record_count += 1
+        Utils.skip(self.current_file_header.file_handle, Utils.DBLK_SIZE)
+        return True
+    def _handle_enqueue_record(self, enqueue_record):
+        while enqueue_record.load(self.current_file_header.file_handle):
+            self._get_next_file()
+        if enqueue_record.is_external() and enqueue_record.data != None:
+            raise qls.err.ExternalDataError(self.current_file_header, 
enqueue_record)
+        if enqueue_record.is_transient():
+            self.statistics.transient_record_count += 1
+            return
+        if enqueue_record.xid_size > 0:
+            self.txn_map.add(self.current_file_header, enqueue_record)
+            self.statistics.transaction_operation_count += 1
+            self.statistics.transaction_record_count += 1
+            self.statistics.transaction_enqueue_count += 1
+        else:
+            self.enq_map.add(self.current_file_header, enqueue_record, False)
+        self.statistics.enqueue_count += 1
+        #print enqueue_record, # DEBUG
+    def _handle_dequeue_record(self, dequeue_record):
+        while dequeue_record.load(self.current_file_header.file_handle):
+            self._get_next_file()
+        if dequeue_record.xid_size > 0:
+            if self.xid_prepared_list is None: # ie this is the TPL
+                dequeue_record.transaction_prepared_list_flag = True
+            self.txn_map.add(self.current_file_header, dequeue_record)
+            self.statistics.transaction_operation_count += 1
+            self.statistics.transaction_record_count += 1
+            self.statistics.transaction_dequeue_count += 1
+        else:
+            try:
+                self.enq_map.delete(self.current_file_header, dequeue_record)
+            except qls.err.RecordIdNotFoundError:
+                pass # TODO: handle missing enqueue warning here
+        self.statistics.dequeue_count += 1
+        #print dequeue_record, # DEBUG
+    def _handle_transaction_record(self, transaction_record):
+        while transaction_record.load(self.current_file_header.file_handle):
+            self._get_next_file()
+        if transaction_record.magic[-1] == 'a':
+            self.statistics.transaction_abort_count += 1
+        else:
+            self.statistics.transaction_commit_count += 1
+        if self.txn_map.contains(transaction_record.xid):
+            mismatched_rids = self.txn_map.delete(self.current_file_header, 
transaction_record)
+            if mismatched_rids != None and len(mismatched_rids) > 0:
+                self.warnings.append('WARNING: transactional dequeues not 
found in enqueue map; rids=%s' %
+                                     mismatched_rids)
+        else:
+            self.warnings.append('WARNING: %s not found in transaction map' % \
+                                 Utils.format_xid(transaction_record.xid))
+#        if transaction_record.magic[-1] == 'c': # commits only
+#            self._txn_obj_list[hdr.xid] = hdr
+        self.statistics.transaction_record_count += 1
+        #print transaction_record, # DEBUG
+    def _load_data(self, record):
+        while not record.is_complete:
+            record.load(self.current_file_header.file_handle)
+
+class RecordHeader(object):
+    FORMAT = '<4s2H2Q'
+    def __init__(self, file_offset, magic, version, user_flags, serial, 
record_id):
+        self.file_offset = file_offset
+        self.magic = magic
+        self.version = version
+        self.user_flags = user_flags
+        self.serial = serial
+        self.record_id = record_id
+    def load(self, file_handle):
+        pass
+    @staticmethod
+    def discriminate(args):
+        """Use the last char in the header magic to determine the header 
type"""
+        return _CLASSES.get(args[1][-1], RecordHeader)
+    def is_empty(self):
+        """Return True if this record is empty (ie has a magic of 0x0000"""
+        return self.magic == '\x00'*4
+    def is_valid(self, file_header):
+        """Check that this record is valid"""
+        if self.is_empty():
+            return False
+        if self.magic[:3] != 'QLS' or self.magic[3] not in ['a', 'c', 'd', 
'e', 'f', 'x']:
+            return False
+        if self.magic[-1] != 'x':
+            if self.version != Utils.RECORD_VERSION:
+                raise qls.err.InvalidRecordVersionError(file_header, self, 
Utils.RECORD_VERSION)
+            if self.serial != file_header.serial:
+                #print '[serial mismatch at 0x%x]' % self.file_offset #DEBUG
+                #print #DEBUG
+                return False
+        return True
+    def __str__(self):
+        """Return string representation of this header"""
+        if self.is_empty():
+            return '0x%08x: <empty>' % (self.file_offset)
+        if self.magic[-1] == 'x':
+            return '0x%08x: [X]' % (self.file_offset)
+        if self.magic[-1] in ['a', 'c', 'd', 'e', 'f', 'x']:
+            return '0x%08x: [%c v=%d f=0x%04x rid=0x%x]' % \
+                (self.file_offset, self.magic[-1].upper(), self.version, 
self.user_flags, self.record_id)
+        return '0x%08x: <error, unknown magic "%s" (possible overwrite 
boundary?)>' %  (self.file_offset, self.magic)
+
+class RecordTail(object):
+    FORMAT = '<4sL2Q'
+    def __init__(self, file_handle):
+        self.file_offset = file_handle.tell()
+        self.complete = False
+        self.read_size = struct.calcsize(RecordTail.FORMAT)
+        self.fbin = file_handle.read(self.read_size)
+        if len(self.fbin) >= self.read_size:
+            self.complete = True
+            self.xmagic, self.checksum, self.serial, self.record_id = 
struct.unpack(RecordTail.FORMAT, self.fbin)
+    def load(self, file_handle):
+        """Used to continue load of RecordTail object if it is split between 
files"""
+        if not self.is_complete:
+            self.fbin += file_handle.read(self.read_size - len(self.fbin))
+            if (len(self.fbin)) >= self.read_size:
+                self.complete = True
+                self.xmagic, self.checksum, self.serial, self.record_id = 
struct.unpack(RecordTail.FORMAT, self.fbin)
+    def is_complete(self):
+        return self.complete
+    def __str__(self):
+        """Return a string representation of the this RecordTail instance"""
+        magic = Utils.inv_str(self.xmagic)
+        return '[%c cs=0x%x rid=0x%x]' % (magic[-1].upper(), self.checksum, 
self.record_id)
+
+class FileHeader(RecordHeader):
+    FORMAT = '<2H4x5QH'
+    def init(self, file_handle, file_offset, file_header_size_sblks, 
partition_num, efp_data_size_kb,
+             first_record_offset, timestamp_sec, timestamp_ns, file_num, 
queue_name_len):
+        self.file_handle = file_handle
+        self.file_header_size_sblks = file_header_size_sblks
+        self.partition_num = partition_num
+        self.efp_data_size_kb = efp_data_size_kb
+        self.first_record_offset = first_record_offset
+        self.timestamp_sec = timestamp_sec
+        self.timestamp_ns = timestamp_ns
+        self.file_num = file_num
+        self.queue_name_len = queue_name_len
+        self.queue_name = None
+    def load(self, file_handle):
+        self.queue_name = file_handle.read(self.queue_name_len)
+    def get_file_size(self):
+        """Sum of file header size and data size"""
+        return (self.file_header_size_sblks * Utils.SBLK_SIZE) + 
(self.efp_data_size_kb * 1024)
+    def is_end_of_file(self):
+        return self.file_handle.tell() >= self.get_file_size()
+    def timestamp_str(self):
+        """Get the timestamp of this record in string format"""
+        time = gmtime(self.timestamp_sec)
+        fstr = '%%a %%b %%d %%H:%%M:%%S.%09d %%Y' % (self.timestamp_ns)
+        return strftime(fstr, time)
+    def __str__(self):
+        """Return a string representation of the this FileHeader instance"""
+        return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s' % 
(RecordHeader.__str__(self), self.file_num, self.first_record_offset,
+                                                          self.partition_num, 
self.efp_data_size_kb, self.timestamp_str())
+
+class EnqueueRecord(RecordHeader):
+    FORMAT = '<2Q'
+    EXTERNAL_FLAG_MASK = 0x20
+    TRANSIENT_FLAG_MASK = 0x10
+    def init(self, file_offset, xid_size, data_size):
+        self.xid_size = xid_size
+        self.data_size = data_size
+        self.xid = None
+        self.xid_complete = False
+        self.data = None
+        self.data_complete = False
+        self.record_tail = None
+    def is_external(self):
+        return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0
+    def is_transient(self):
+        return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0
+    def load(self, file_handle):
+        """Return True when load is incomplete and must be called again with 
new file handle"""
+        self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, 
self.xid_size)
+        if not self.xid_complete:
+            return True
+        if self.is_external():
+            self.data_complete = True
+        else:
+            self.data, self.data_complete = Utils.load_data(file_handle, 
self.data, self.data_size)
+            if not self.data_complete:
+                return True
+        if self.xid_size > 0 or self.data_size > 0:
+            if self.record_tail is None:
+                self.record_tail = RecordTail(file_handle)
+            elif not self.record_tail.is_complete():
+                self.record_tail.load(file_handle) # Continue loading 
partially loaded tail
+            return not self.record_tail.is_complete()
+        return False
+    def _print_flags(self):
+        """Utility function to decode the flags field in the header and print 
a string representation"""
+        fstr = ''
+        if self.is_transient():
+            fstr = '[TRANSIENT'
+        if self.is_external():
+            if len(fstr) > 0:
+                fstr += ',EXTERNAL'
+            else:
+                fstr = '*EXTERNAL'
+        if len(fstr) > 0:
+            fstr += ']'
+        return fstr
+    def __str__(self):
+        """Return a string representation of the this EnqueueRecord instance"""
+        if self.record_tail is None:
+            record_tail_str = ''
+        else:
+            record_tail_str = str(self.record_tail)
+        return '%s %s %s %s %s' % (RecordHeader.__str__(self), 
Utils.format_xid(self.xid, self.xid_size),
+                                   Utils.format_data(self.data_size, 
self.data), record_tail_str, self._print_flags())
+
+class DequeueRecord(RecordHeader):
+    FORMAT = '<2Q'
+    TXN_COMPLETE_COMMIT_FLAG = 0x10
+    def init(self, file_offset, dequeue_record_id, xid_size):
+        self.dequeue_record_id = dequeue_record_id
+        self.xid_size = xid_size
+        self.transaction_prepared_list_flag = False
+        self.xid = None
+        self.xid_complete = False
+        self.record_tail = None
+    def is_transaction_complete_commit(self):
+        return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0
+    def load(self, file_handle):
+        """Return True when load is incomplete and must be called again with 
new file handle"""
+        self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, 
self.xid_size)
+        if not self.xid_complete:
+            return True
+        if self.xid_size > 0:
+            if self.record_tail is None:
+                self.record_tail = RecordTail(file_handle)
+            elif not self.record_tail.is_complete():
+                self.record_tail.load(file_handle)
+            return not self.record_tail.is_complete()
+        return False
+    def _print_flags(self):
+        """Utility function to decode the flags field in the header and print 
a string representation"""
+        if self.transaction_prepared_list_flag:
+            if self.is_transaction_complete_commit():
+                return '[COMMIT]'
+            else:
+                return '[ABORT]'
+        return ''
+    def __str__(self):
+        """Return a string representation of the this DequeueRecord instance"""
+        if self.record_tail is None:
+            record_tail_str = ''
+        else:
+            record_tail_str = str(self.record_tail)
+        return '%s %s drid=0x%x %s %s' % (RecordHeader.__str__(self), 
Utils.format_xid(self.xid, self.xid_size),
+                                          self.dequeue_record_id, 
record_tail_str, self._print_flags())
+
+class TransactionRecord(RecordHeader):
+    FORMAT = '<Q'
+    def init(self, file_offset, xid_size):
+        self.xid_size = xid_size
+        self.xid = None
+        self.xid_complete = False
+        self.record_tail = None
+    def load(self, file_handle):
+        """Return True when load is incomplete and must be called again with 
new file handle"""
+        self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, 
self.xid_size)
+        if not self.xid_complete:
+            return True
+        if self.xid_size > 0:
+            if self.record_tail is None:
+                self.record_tail = RecordTail(file_handle)
+            elif not self.record_tail.is_complete():
+                self.record_tail.load(file_handle)
+            return not self.record_tail.is_complete()
+        return False
+    def __str__(self):
+        """Return a string representation of the this TransactionRecord 
instance"""
+        if self.record_tail is None:
+            record_tail_str = ''
+        else:
+            record_tail_str = str(self.record_tail)
+        return '%s %s %s' % (RecordHeader.__str__(self), 
Utils.format_xid(self.xid, self.xid_size), record_tail_str)
+
+class Utils(object):
+    """Class containing utility functions for dealing with the journal"""
+    DBLK_SIZE = 128
+    RECORD_VERSION = 2
+    SBLK_SIZE = 4096
+    __printchars = 
'0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~
 '
+    @staticmethod
+    def format_data(dsize, data):
+        """Format binary data for printing"""
+        if data == None:
+            return ''
+        # << DEBUG >>
+        begin = data.find('msg')
+        end = data.find('\0', begin)
+        return 'data="%s"' % data[begin:end]
+        # << END DEBUG
+        if Utils._is_printable(data):
+            datastr = Utils._split_str(data)
+        else:
+            datastr = Utils._hex_split_str(data)
+        if dsize != len(data):
+            raise qls.err.DataSizeError(dsize, len(data), datastr)
+        return 'data(%d)="%s"' % (dsize, datastr)
+    @staticmethod
+    def format_xid(xid, xidsize=None):
+        """Format binary XID for printing"""
+        if xid == None and xidsize != None:
+            if xidsize > 0:
+                raise qls.err.XidSizeError(xidsize, 0, None)
+            return ''
+        if Utils._is_printable(xid):
+            xidstr = '"%s"' % Utils._split_str(xid)
+        else:
+            xidstr = '0x%s' % Utils._hex_split_str(xid)
+        if xidsize == None:
+            xidsize = len(xid)
+        elif xidsize != len(xid):
+            raise qls.err.XidSizeError(xidsize, len(xid), xidstr)
+        return 'xid(%d)=%s' % (xidsize, xidstr)
+    @staticmethod
+    def inv_str(in_string):
+        """Perform a binary 1's compliment (invert all bits) on a binary 
string"""
+        istr = ''
+        for index in range(0, len(in_string)):
+            istr += chr(~ord(in_string[index]) & 0xff)
+        return istr
+    @staticmethod
+    def load(file_handle, klass):
+        """Load a record of class klass from a file"""
+        args = Utils.load_args(file_handle, klass)
+        subclass = klass.discriminate(args)
+        result = subclass(*args) # create instance of record
+        if subclass != klass:
+            result.init(*Utils.load_args(file_handle, subclass))
+        return result
+    @staticmethod
+    def load_args(file_handle, klass):
+        """Load the arguments from class klass"""
+        size = struct.calcsize(klass.FORMAT)
+        foffs = file_handle.tell(),
+        fbin = file_handle.read(size)
+        if len(fbin) != size:
+            raise qls.err.UnexpectedEndOfFileError(len(fbin), size, foffs, 
file_handle.name)
+        return foffs + struct.unpack(klass.FORMAT, fbin)
+    @staticmethod
+    def load_data(file_handle, element, element_size):
+        if element_size == 0:
+            return element, True
+        if element is None:
+            element = file_handle.read(element_size)
+        else:
+            read_size = element_size - len(element)
+            element += file_handle.read(read_size)
+        return element, len(element) == element_size
+    @staticmethod
+    def skip(file_handle, boundary):
+        """Read and discard disk bytes until the next multiple of boundary"""
+        file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary))
+    #--- protected functions ---
+    @staticmethod
+    def _hex_str(in_str, begin, end):
+        """Return a binary string as a hex string"""
+        hstr = ''
+        for index in range(begin, end):
+            if Utils._is_printable(in_str[index]):
+                hstr += in_str[index]
+            else:
+                hstr += '\\%02x' % ord(in_str[index])
+        return hstr
+    @staticmethod
+    def _hex_split_str(in_str, split_size = 50):
+        """Split a hex string into two parts separated by an ellipsis"""
+#        if len(in_str) <= split_size:
+#            return Utils._hex_str(in_str, 0, len(in_str))
+#        return Utils._hex_str(in_str, 0, 10) + ' ... ' + 
Utils._hex_str(in_str, len(in_str)-10, len(in_str))
+        return ''.join(x.encode('hex') for x in reversed(in_str))
+    @staticmethod
+    def _is_printable(in_str):
+        """Return True if in_str in printable; False otherwise."""
+        return in_str.strip(Utils.__printchars) == ''
+    @staticmethod
+    def _rem_bytes_in_block(file_handle, block_size):
+        """Return the remaining bytes in a block"""
+        foffs = file_handle.tell()
+        return (Utils._size_in_blocks(foffs, block_size) * block_size) - foffs
+    @staticmethod
+    def _size_in_blocks(size, block_size):
+        """Return the size in terms of data blocks"""
+        return int((size + block_size - 1) / block_size)
+    @staticmethod
+    def _split_str(in_str, split_size = 50):
+        """Split a string into two parts separated by an ellipsis if it is 
longer than split_size"""
+        if len(in_str) < split_size:
+            return in_str
+        return in_str[:25] + ' ... ' + in_str[-25:]
+
+# =============================================================================
+
+_CLASSES = {
+    'a': TransactionRecord,
+    'c': TransactionRecord,
+    'd': DequeueRecord,
+    'e': EnqueueRecord,
+}
+
+if __name__ == '__main__':
+    print 'This is a library, and cannot be executed.'

Added: qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py?rev=1556888&view=auto
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py (added)
+++ qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py Thu Jan  9 17:19:59 2014
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import os
+import os.path
+from qls import efp
+from qls import jrnl
+
+class QqpdLinearStoreAnalyzer(object):
+    """
+    Top-level store analyzer. Will analyze the directory in args.qls_dir as 
the top-level Qpid Linear Store (QLS)
+    directory. The following may be analyzed:
+    * The Empty File Pool (if --efp is specified in the arguments)
+    * The Linear Store
+    * The Transaction Prepared List (TPL)
+    """
+    QLS_ANALYZE_VERSION = '0.1'
+    def __init__(self):
+        self.args = None
+        self._process_args()
+        self.efp_manager = efp.EfpManager(self.args.qls_dir)
+        self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.args.qls_dir)
+    def _analyze_efp(self):
+        self.efp_manager.run(self.args)
+    def _analyze_journals(self):
+        self.jrnl_recovery_mgr.run(self.args)
+    def _process_args(self):
+        parser = argparse.ArgumentParser(description = 'Qpid Linear Store 
Analyzer')
+        parser.add_argument('qls_dir', metavar='DIR',
+                            help='Qpid Linear Store (QLS) directory to be 
analyzed')
+        parser.add_argument('--efp', action='store_true',
+                            help='Analyze the Emtpy File Pool (EFP) and show 
stats')
+        parser.add_argument('--stats', action='store_true',
+                            help='Print journal record stats')
+        parser.add_argument('--txn', action='store_true',
+                            help='Reconcile incomplete transactions')
+        parser.add_argument('--version', action='version', version='%(prog)s ' 
+
+                            QqpdLinearStoreAnalyzer.QLS_ANALYZE_VERSION)
+        self.args = parser.parse_args()
+        if not os.path.exists(self.args.qls_dir):
+            parser.error('Journal path "%s" does not exist' % 
self.args.qls_dir)
+    def report(self):
+        if self.args.efp:
+            self.efp_manager.report()
+        self.jrnl_recovery_mgr.report(self.args.stats)
+    def run(self):
+        if self.args.efp:
+            self._analyze_efp()
+        self._analyze_journals()
+
+#==============================================================================
+# main program
+#==============================================================================
+
+if __name__ == "__main__":
+    # TODO: Remove this in due course
+    print 'WARNING: This program is still a work in progress and is largely 
untested.'
+    print '* USE AT YOUR OWN RISK *'
+    print
+    M = QqpdLinearStoreAnalyzer()
+    M.run()
+    M.report()

Propchange: qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to