Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h?rev=1516958&r1=1516957&r2=1516958&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/jrec.h Fri Aug 23 18:07:15 2013 @@ -150,7 +150,7 @@ namespace qls_jrnl static inline uint32_t size_dblks(const std::size_t size) { return size_blks(size, JRNL_DBLK_SIZE); } static inline uint32_t size_sblks(const std::size_t size) - { return size_blks(size, JRNL_DBLK_SIZE * JRNL_SBLK_SIZE); } + { return size_blks(size, JRNL_SBLK_SIZE); } static inline uint32_t size_blks(const std::size_t size, const std::size_t blksize) { return (size + blksize - 1)/blksize; } virtual uint64_t rid() const = 0;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp?rev=1516958&r1=1516957&r2=1516958&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/pmgr.cpp Fri Aug 23 18:07:15 2013 @@ -38,6 +38,7 @@ namespace qls_jrnl pmgr::page_cb::page_cb(uint16_t index): _index(index), _state(UNUSED), + _frid(0), _wdblks(0), _rdblks(0), _pdtokl(0), @@ -63,7 +64,7 @@ pmgr::page_cb::state_str() const return "<unknown>"; } -const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE; +const uint32_t pmgr::_sblksize = JRNL_SBLK_SIZE; pmgr::pmgr(jcntl* jc, enq_map& emap, txn_map& tmap): _cache_pgsize_sblks(0), Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp?rev=1516958&r1=1516957&r2=1516958&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/rmgr.cpp Fri Aug 23 18:07:15 2013 @@ -266,7 +266,7 @@ rmgr::get_events(page_state /*state*/, t { // TODO: replace for linear store: _rfh /* - if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE) // Detects if write reset of this fcntl obj has occurred. + if (pcbp->_rfh->rd_subm_cnt_dblks() >= JRNL_SBLK_SIZE_DBLKS) // Detects if write reset of this fcntl obj has occurred. { // Increment the completed read offset // NOTE: We cannot use _rrfc here, as it may have rotated since submitting count. @@ -281,15 +281,15 @@ rmgr::get_events(page_state /*state*/, t else // File header reads have no pcb { std::memcpy(&_fhdr, _fhdr_buffer, sizeof(file_hdr_t)); - /*_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE);*/ // TODO: replace for linear store: _rrfc + /*_rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS);*/ // TODO: replace for linear store: _rrfc - uint32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE; + uint32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE_DBLKS; // Check fro_dblks does not exceed the write pointers which can happen in some corrupted journal recoveries // TODO: replace for linear store: _fhdr._pfid, _rrfc -// if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE) -// fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE; - _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); - uint32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE; +// if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE_DBLKS) +// fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE_DBLKS; + _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS); + uint32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS; _pg_index = _pg_cntr % JRNL_RMGR_PAGES; _pg_offset_dblks = fro_dblks - tot_pg_offs_dblks; // _rrfc.add_subm_cnt_dblks(tot_pg_offs_dblks); @@ -601,16 +601,16 @@ rmgr::init_aio_reads(const int16_t /*fir if (_rrfc.subm_offs() == 0) { - _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); - _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE); + _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); + _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); } // TODO: Future perf improvement: Do a single AIO read for all available file // space into all contiguous empty pages in one AIO operation. uint32_t file_rem_dblks = _rrfc.remaining_dblks(); - file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE; // round down to closest sblk boundary - uint32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE; + file_rem_dblks -= file_rem_dblks % JRNL_SBLK_SIZE_DBLKS; // round down to closest sblk boundary + uint32_t pg_size_dblks = JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS; uint32_t rd_size = file_rem_dblks > pg_size_dblks ? pg_size_dblks : file_rem_dblks; if (rd_size) { @@ -618,7 +618,7 @@ rmgr::init_aio_reads(const int16_t /*fir // TODO: For perf, combine contiguous pages into single read // 1 or 2 AIOs needed depending on whether read block folds aio_cb* aiocbp = &_aio_cb_arr[pi]; - aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE, _rrfc.subm_offs()); + aio::prep_pread_2(aiocbp, _rrfc.fh(), _page_ptr_arr[pi], rd_size * JRNL_DBLK_SIZE_DBLKS, _rrfc.subm_offs()); if (aio::submit(_ioctx, 1, &aiocbp) < 0) throw jexception(jerrno::JERR__AIO, "rmgr", "init_aio_reads"); _rrfc.add_subm_cnt_dblks(rd_size); @@ -640,7 +640,7 @@ rmgr::rotate_page() { _page_cb_arr[_pg_index]._rdblks = 0; _page_cb_arr[_pg_index]._state = UNUSED; - if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE) + if (_pg_offset_dblks >= JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE_DBLKS) { _pg_offset_dblks = 0; _pg_cntr++; @@ -682,7 +682,7 @@ rmgr::init_file_header_read() if (aio::submit(_ioctx, 1, &_fhdr_aio_cb_ptr) < 0) throw jexception(jerrno::JERR__AIO, "rmgr", "init_file_header_read"); _aio_evt_rem++; - _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); + _rrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); _fhdr_rd_outstanding = true; */ } Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp?rev=1516958&r1=1516957&r2=1516958&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp (original) +++ qpid/branches/linearstore/qpid/cpp/src/qpid/linearstore/jrnl/wmgr.cpp Fri Aug 23 18:07:15 2013 @@ -94,13 +94,13 @@ wmgr::initialize(aio_callback* const cbp initialize(cbp, wcache_pgsize_sblks, wcache_num_pages); - _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE; + _jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE_DBLKS; _jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks; assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0); if (eo) { - const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE; + const uint32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS; uint32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr _pg_cntr = data_dblks / wr_pg_size_dblks; _pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks); @@ -154,11 +154,11 @@ wmgr::enqueue(const void* const data_buf bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _enq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -259,11 +259,11 @@ wmgr::dequeue(data_tok* dtokp, const voi bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _deq_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -361,11 +361,11 @@ wmgr::abort(data_tok* dtokp, const void* bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -453,11 +453,11 @@ wmgr::commit(data_tok* dtokp, const void bool done = false; while (!done) { - assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE); + assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS); void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); uint32_t data_offs_dblks = dtokp->dblocks_written(); uint32_t ret = _txn_rec.encode(wptr, data_offs_dblks, - (_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks); + (_cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) - _pg_offset_dblks); // Remember fid which contains the record header in case record is split over several files // TODO: replace for linearstore: _wrfc @@ -545,10 +545,10 @@ wmgr::file_header_check(const uint64_t / if (cont) { if (file_fit && !file_full) - fro = (rec_dblks_rem + JRNL_SBLK_SIZE) * JRNL_DBLK_SIZE; + fro = (rec_dblks_rem + JRNL_SBLK_SIZE_DBLKS) * JRNL_DBLK_SIZE; } else - fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE; + fro = JRNL_SBLK_SIZE; write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro); // TODO: replace for linearstore: _wrfc } */ @@ -558,7 +558,7 @@ void wmgr::flush_check(iores& res, bool& cont, bool& done) { // Is page is full, flush - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE) + if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) { res = write_flush(); assert(res == RHM_IORES_SUCCESS); @@ -814,7 +814,7 @@ wmgr::get_events(page_state state, times file_hdr_t* fhp = (file_hdr*)aiocbp->u.c.buf; uint32_t lfid = fhp->_lfid; fcntl* fcntlp = _jc->get_fcntlp(lfid); - fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE); + fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); fcntlp->decr_aio_cnt(); fcntlp->set_wr_fhdr_aio_outstanding(false); */ @@ -971,7 +971,7 @@ void wmgr::dblk_roundup() { const uint32_t xmagic = QLS_EMPTY_MAGIC; - uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE) * JRNL_SBLK_SIZE; + uint32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE_DBLKS) * JRNL_SBLK_SIZE_DBLKS; while (_cached_offset_dblks < wdblks) { void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE); @@ -1000,7 +1000,7 @@ wmgr::write_fhdr(uint64_t rid, uint16_t _aio_evt_rem++; // TODO: replace for linearstore: _wrfc /* - _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE); + _wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE_DBLKS); _wrfc.incr_aio_cnt(); _wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true); */ @@ -1010,7 +1010,7 @@ void wmgr::rotate_page() { _page_cb_arr[_pg_index]._state = AIO_PENDING; - if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE) + if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE_DBLKS) { _pg_offset_dblks = 0; _pg_cntr++; Modified: qpid/branches/linearstore/qpid/tools/src/py/linearstore/efptool.py URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/tools/src/py/linearstore/efptool.py?rev=1516958&r1=1516957&r2=1516958&view=diff ============================================================================== --- qpid/branches/linearstore/qpid/tools/src/py/linearstore/efptool.py (original) +++ qpid/branches/linearstore/qpid/tools/src/py/linearstore/efptool.py Fri Aug 23 18:07:15 2013 @@ -99,7 +99,8 @@ class EfpArgParser(argparse.ArgumentPars 'size it contains in kb, and followed by the letter \'k\'. (eg a pool ' 'containing 160 kb files is named \'160k\'.)') self.add_argument('store_directory', metavar='STORE-DIR', - help='Use store directory DIR to create pool (required)') + help='Use store directory DIR (required), Usually ends with \'qls\' and contains ' + 'partition(s) \'p000\', \'p001\', ...') self.add_argument('-a', '--add', action='store_true', help='Add new pool of NF files sized FS in partition PN. Must be used with --partition, ' '--size and --num-files') @@ -119,6 +120,8 @@ class EfpArgParser(argparse.ArgumentPars self.add_argument('-n', '--num-files', metavar='NF', help='Set the number of files for the add or freshen action to NF') self.add_argument('-v', '--version', action='version', version='%(prog)s 0.8') + self.add_argument('-V', '--verbose', action='store_true', + help='Verbose output, helps with troubleshooting') @staticmethod def validate_args(args): ''' Static validation function which checks that required dependent args are present as well as no mutually @@ -255,9 +258,12 @@ class Partition: return s def validate_efp_directory(self): ''' Check that the partition directory is valid ''' + if os.path.exists(self.efp_directory): + if not os.path.isdir(self.efp_directory): + raise InvalidPartitionDirectoryError + else: + os.mkdir(self.efp_directory) # TODO: Add checks for permissions to write and sufficient space - if not os.path.exists(self.efp_directory) or not os.path.isdir(self.efp_directory): - raise InvalidPartitionDirectoryError def read(self): ''' Read the partition, identifying EFP directories. Read each EFP directory found. ''' for dir_entry in os.listdir(self.efp_directory): @@ -291,10 +297,13 @@ class EmptyFilePoolManager: self.total_num_files = 0 self.total_cum_file_size = 0 self.current_partition = None - print 'Reading store directory', self.store_directory + if self.args.verbose: + print 'Reading store directory', self.store_directory for dir_entry in os.listdir(self.store_directory): if len(dir_entry) == 4 and dir_entry[0] == 'p': pn = int(dir_entry[1:]) + if self.args.verbose: + print ' Found %s (partition %d)' % (dir_entry, pn) try: self.partitions.append(Partition(os.path.join(self.store_directory, dir_entry), pn, DEFAULT_EFP_DIR_NAME)) @@ -342,7 +351,8 @@ class EmptyFilePoolManager: print 'ERROR: --partition/-p: Partition %s does not exist' % self.args.partition valid = False except ValueError: - print 'ERROR: --partition/-p: Partition %s does not exist' % self.args.partition + print 'ERROR: --partition/-p: \'%s\': Invalid partition name (must be pNNN, where NNN is the ' \ + 'partition number)' % self.args.partition valid = False if not self.current_partition is None: if self.args.add: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
