Hi, Petr, Attached is a modification of the file tests/setqgen.cpp to mimic your use case. So far, it seems to produce exactly the same output set (as a whole, not in the individual partitions) as produced by tests/setqgen.cpp. It works OK on my laptop. Would you mind take a look and see if you can get it to behave more like what your program does?
Thanks. John On Thu, Aug 30, 2012 at 11:45 PM, Petr Velan <[email protected]> wrote: > Hi John, > > The memory management in FastBit is good for batch mode of operation, > however I need to reduce the memory footprint, since there might be > some other operations that might require large amount of memory for > the short time, so that the FastBit cannot have it allocated all the > time. > > I know that there is a limit that allows to use maximum of half of > available memory and that it can be changed. I think it would be a > good thing to have two limits. One to set maximum that can be used and > other that would trigger the unload function. The result would be that > I would set FastBit to have 0.5GB at ready and allow it to expand to > 10GB. So when needed, the fastbit would use up to 10GB of memory, but > after that it would would free it and keep only 0.5GB for further use. > The default could still be to have both limits at half of available > memory. What do you think? > > We are currently trying to manually call > ibis::fileManager::instance().flushDir() to see if it helps to keep > the memory down, but I believe that the solution I described earlier > is much more generic. > > Unfortunately, I do not have any simple code to reproduce the > deadlock. I'll try to look into it, maybe compile FastBit with > debugging symbols to help us better understand what is going on. > > Petr > > On 30 August 2012 20:29, K. John Wu <[email protected]> wrote: >> Hi, Petr, >> >> Thanks for clarifying the use case. Looks like you can not wait for >> everything to be done before releasing the ibis::part objects. >> Regarding the memory usage, FastBit does lazy deletions - as long as >> no one needs new memory, the existing content read from files will be >> kept in memory. The default maximum memory to be used is a half of >> the physical memory - which explains what you've observed. Once >> reaching that limit, ibis::fileManager::unload will be called to >> remove the content of files that are no in active use. In your case, >> it sounds like there will be a lot of old files to be removed from memory. >> >> Since there is no clear indication which thread is holding on to the >> mutex lock, we might need to create a multithreaded data generator >> that can mimic your data ingestion process. If you have simple one >> that I can borrow, I would greatly appreciate it. >> >> Most likely, another copy of ibis::fileManager::getFile is holding on >> to the ibis::fileManager::mutex. However, logically, that is not >> possible because that thread can only be waiting on a conditional >> variable in which case it should have yield the mutex lock already. >> Anyway, something gnarly is going on here.. >> >> John >> >> >> On 8/29/12 10:54 PM, Petr Velan wrote: >>> Hi John, >>> >>> I still do not understand why there is a deadlock, or why is the >>> access to different partitions managed by same mutex lock. >>> >>> Our use case is this: >>> We have a process that collects data from network and stores them to >>> fastbit partitions. Each partition contains 5 minutes of data, >>> approximately 300-400MB. After 5 minutes expire, new thread is >>> launched that creates ibis::part, runs reorder, deletes the part, >>> creates ibis::table which is used to create indexes and then deletes >>> the table. After that the thread ends. >>> >>> Since there is data from multiple sources, there are multiple threads >>> that store the data and reorder/index it. >>> >>> What is bothering me are two things: >>> The deadlock, since the mutex should only synchronize, I wonder who >>> really holds the lock when both threads are waiting for it. >>> Second is that the memory used by the process constantly grows. After >>> the parts and tables are deleted, I would expect the memory to be >>> released as well, since for next 5 minutes, it will not be needed. >>> Unfortunately, FastBit does not free the memory until it reaches 50% >>> of total memory, which in our case is 6GB. That is kind of >>> unfortunate, since what it should really need is about 1GB of memory >>> for reorder in the worst case and then the memory should be free to >>> use by other processes. Is there any way to achieve this? The memory >>> is consumed even without the reordering, only when building indexes. >>> >>> Thank you for the warning about strings, we plan to use them in >>> future, so we will have to without the reorder in that case. >>> >>> Petr >>> >>> On 29 August 2012 18:28, K. John Wu <[email protected]> wrote: >>>> Hi, Petr, >>>> >>>> From the stack traces, look like one thread is trying to free a data >>>> partition object while another one is trying to reorder the rows of >>>> presumably another data partition. The first mutex lock is invoked >>>> from the constructor of a storage object (ibis::fileManager::storage). >>>> This is invoked because the amount of data in memory (tracked by the >>>> file manager) is close to the prescribed maximum (maxBytes). The >>>> second mutex lock is invoked from a function called >>>> ibis::fileManager::removeCleaner (which is invoked by the destructor >>>> of an ibis::part object). >>>> >>>> Running out memory seems to be the fundamental problem here. >>>> Presumably, you only need to do reordering once and your datasets are >>>> quite large. I would suggest that you use only a single thread to >>>> reorder your data - this way all the memory will devoted to a single >>>> reordering operation. >>>> >>>> If you really do have a lot of memory (or each data partition is >>>> relatively small) and want to do the reordering with multiple threads, >>>> then delay the operation of freeing the ibis::part objects until you >>>> are done with all reordering operations. The cleaner objects from >>>> each data partition will make sure each ibis::part object is taking >>>> only a minimal amount of memory. >>>> >>>> A note of warning, the current code only sort the numerical values, >>>> any strings or blobs will be left untouched. If your datasets have >>>> strings or blobs, your datasets will not be coherent after calling the >>>> function reorder! >>>> >>>> John >>>> >>>> >>>> On 8/29/12 4:57 AM, Petr Velan wrote: >>>>> Hi John, >>>>> >>>>> thank you for all the work that you put into the FastBit library, it >>>>> allows us to achieve great results! >>>>> >>>>> I've bumped into a little bug which might be very hard to reproduce or >>>>> identify. I'm using two thread to reorder and index data that are >>>>> already stored on disk. It was ok for a little while, but then it >>>>> stuck in deadlock. Here are gdb traces from both threads, >>>>> unfortunately without debugging symbols, so that the specific files >>>>> and lines are unknown. >>>>> >>>>> We are currently using the SVN version 532. >>>>> >>>>> (gdb) bt >>>>> #0 0x00007f8983463054 in __lll_lock_wait () from /lib64/libpthread.so.0 >>>>> #1 0x00007f898345e388 in _L_lock_854 () from /lib64/libpthread.so.0 >>>>> #2 0x00007f898345e257 in pthread_mutex_lock () from >>>>> /lib64/libpthread.so.0 >>>>> #3 0x00007f898271e074 in ibis::fileManager::storage::storage(unsigned >>>>> long) () from /usr/lib64/libfastbit.so.0 >>>>> #4 0x00007f898271eb16 in ibis::fileManager::storage::enlarge(unsigned >>>>> long) () from /usr/lib64/libfastbit.so.0 >>>>> #5 0x00007f898272214f in ibis::fileManager::roFile::doRead(char >>>>> const*) () from /usr/lib64/libfastbit.so.0 >>>>> #6 0x00007f8982723b4b in ibis::fileManager::getFile(char const*, >>>>> ibis::fileManager::storage**, ibis::fileManager::ACCESS_PREFERENCE) () >>>>> from /usr/lib64/libfastbit.so.0 >>>>> #7 0x00007f898273406a in int ibis::fileManager::getFile<unsigned >>>>> short>(char const*, ibis::array_t<unsigned short>&, >>>>> ibis::fileManager::ACCESS_PREFERENCE) () from >>>>> /usr/lib64/libfastbit.so.0 >>>>> #8 0x00007f8981f9f4a5 in ibis::column::actualMinMax(char const*, >>>>> ibis::bitvector const&, double&, double&) const () from >>>>> /usr/lib64/libfastbit.so.0 >>>>> #9 0x00007f8981fa3546 in ibis::column::computeMinMax() () from >>>>> /usr/lib64/libfastbit.so.0 >>>>> #10 0x00007f89827beae6 in >>>>> ibis::part::gatherSortKeys(ibis::array_t<char const*>&) () from >>>>> /usr/lib64/libfastbit.so.0 >>>>> #11 0x00007f89827bfc56 in ibis::part::reorder() () from >>>>> /usr/lib64/libfastbit.so.0 >>>>> #12 0x00007f8982c7e2af in reorder_index(void*) () from >>>>> /usr/share/ipfixcol/plugins/ipfixcol-fastbit-output.so >>>>> #13 0x00007f898345c851 in start_thread () from /lib64/libpthread.so.0 >>>>> #14 0x00007f89831aa6dd in next_line () from /lib64/libc.so.6 >>>>> #15 0x0000000000000000 in ?? () >>>>> (gdb) >>>>> >>>>> >>>>> (gdb) bt >>>>> #0 0x00007f8983463054 in __lll_lock_wait () from /lib64/libpthread.so.0 >>>>> #1 0x00007f898345e388 in _L_lock_854 () from /lib64/libpthread.so.0 >>>>> #2 0x00007f898345e257 in pthread_mutex_lock () from >>>>> /lib64/libpthread.so.0 >>>>> #3 0x00007f898175a6aa in >>>>> ibis::util::mutexLock::mutexLock(pthread_mutex_t*, char const*) () >>>>> from /usr/lib64/libfastbit.so.0 >>>>> #4 0x00007f89827177d4 in >>>>> ibis::fileManager::removeCleaner(ibis::fileManager::cleaner const*) () >>>>> from /usr/lib64/libfastbit.so.0 >>>>> #5 0x00007f8981735952 in ibis::part::~part() () from >>>>> /usr/lib64/libfastbit.so.0 >>>>> #6 0x00007f8981735c29 in ibis::part::~part() () from >>>>> /usr/lib64/libfastbit.so.0 >>>>> #7 0x00007f8982c7e2cd in reorder_index(void*) () from >>>>> /usr/share/ipfixcol/plugins/ipfixcol-fastbit-output.so >>>>> #8 0x00007f898345c851 in start_thread () from /lib64/libpthread.so.0 >>>>> #9 0x00007f89831aa6dd in next_line () from /lib64/libc.so.6 >>>>> #10 0x0000000000000000 in ?? () >>>>> (gdb) >>>>> >>>>> >>>>> Do you have any idea what might be going on? >>>>> >>>>> With regards, >>>>> Petr Velan >>>>>
/*************************************************************** * * pargen - data generator for the set query benchmark * <http://www.cs.umb.edu/~poneil/SetQBM.pdf> * * usage: setqgen <root-data-dir> <#rows> <#rows-per-dir> * * It generates the data as raw binary integers in the format that can be * directly used by FastBit functions. If more than one directory is * needed, it will generate a set of subdirectories in the root-data-dir * with the names that are a concatenation of the root-data-dir name and * hexadecimal version of the partition number. * * Optional 4th and 5th arguments could be given. When only four arguments * are given, if it starts with a decimal digit it program writes out an * extra column of blobs to test the new blob support, otherwise, the 4th * argument is taken as a configuration file name to be passed to the * initialization function for FastBit. When five arguments are given, the * 4th argument is taken to be the configuration file controlling the * initialization of FastBit, and the presence of the 5th parameter inform * this program to test the new blob feature. ***************************************************************/ #include <ibis.h> /* ibis::init */ #include <string.h> /* strrchr */ #include <stdlib.h> #include <stdio.h> #include <cmath> /* fmod, ceil */ #include <iomanip> /* setprecision, setfill */ #include <memory> /* std::auto_ptr */ /* number of numeric columns to generate data for SETQ is always 13, with one as a sequence number */ #define NUMCOLS 12 /** column cardinalities: # distinct values*/ const int colcard[]={2,4,5,10,25,100,1000,10000,40000,100000,250000,500000}; /** column names */ const char *colname[]={"K2", "K4", "K5", "K10", "K25", "K100", "K1K", "K10K", "K40K", "K100K", "K250K", "K500K", "KSEQ", "V"}; static bool addBlobs = false; /* constants for random numbers */ const static double SETQRAND_MODULUS = 2147483647.0; const static double SETQRAND_MULTIPLIER = 16807.0; /* the actual random value */ static double SETQRAND_seed = 1.0; /** A simple random number generator. It generates the same sequence of numbers each time. */ inline int setqrand(void) { SETQRAND_seed = std::fmod(SETQRAND_MULTIPLIER*SETQRAND_seed, SETQRAND_MODULUS); return (int)SETQRAND_seed; } /* setqrand */ /// Fill a row of the data table. This function uses a mutex lock so that /// the data generated from multiple threads will be the same. static uint64_t fillRow(ibis::table::row& val) { static uint64_t seq = 0; ibis::util::quietLock lock(&ibis::util::envLock); ++ seq; val.uintsvalues[3] = (uint32_t)seq; val.uintsvalues[2] = (setqrand() % colcard[11]) + 1; val.uintsvalues[1] = (setqrand() % colcard[10]) + 1; val.uintsvalues[0] = (setqrand() % colcard[9]) + 1; val.ushortsvalues[2] = (setqrand() % colcard[8]) + 1; val.ushortsvalues[1] = (setqrand() % colcard[7]) + 1; val.ushortsvalues[0] = (setqrand() % colcard[6]) + 1; val.ubytesvalues[5] = (setqrand() % colcard[5]) + 1; val.ubytesvalues[4] = (setqrand() % colcard[4]) + 1; val.ubytesvalues[3] = (setqrand() % colcard[3]) + 1; val.ubytesvalues[2] = (setqrand() % colcard[2]) + 1; val.ubytesvalues[1] = (setqrand() % colcard[1]) + 1; val.ubytesvalues[0] = (setqrand() % colcard[0]) + 1; if (addBlobs) { // construct a raw string object from the character table unsigned sz = (unsigned)(ibis::util::rand()*65.0); val.blobsvalues[0].copy(ibis::util::charTable, sz); char *str = const_cast<char*>(val.blobsvalues[0].address()); size_t j = (size_t)(ibis::util::rand() * sz); while (j < sz) { // add some holes str[j] = 0; j += 3U + (size_t)(ibis::util::rand() * sz); } } return seq; } // fillRow static void initColumns(ibis::tablex& tab, ibis::table::row& val) { tab.addColumn(colname[0], ibis::UBYTE); tab.addColumn(colname[1], ibis::UBYTE); tab.addColumn(colname[2], ibis::UBYTE); tab.addColumn(colname[3], ibis::UBYTE); tab.addColumn(colname[4], ibis::UBYTE); tab.addColumn(colname[5], ibis::UBYTE); tab.addColumn(colname[6], ibis::USHORT); tab.addColumn(colname[7], ibis::USHORT); tab.addColumn(colname[8], ibis::USHORT); tab.addColumn(colname[9], ibis::UINT); tab.addColumn(colname[10], ibis::UINT); tab.addColumn(colname[11], ibis::UINT); tab.addColumn(colname[12], ibis::UINT, colname[12], "<binning precsion=2 reorder/><encoding equality/>"); val.clear(); val.ubytesnames.resize(6); val.ubytesvalues.resize(6); val.ushortsnames.resize(3); val.ushortsvalues.resize(3); val.uintsnames.resize(4); val.uintsvalues.resize(4); if (addBlobs) { tab.addColumn(colname[13], ibis::BLOB, "opaque values"); val.blobsnames.resize(1); val.blobsvalues.resize(1); } } // initColumns struct parameters { const char* droot; int64_t maxrow, nrpd; int ndigits, totcols; }; static parameters gParam; static ibis::util::counter pCounter; extern "C" { static void* thrfun(void*) { long ierr; uint64_t irow=0; uint32_t ipart=0; ibis::table::row val; std::auto_ptr<ibis::tablex> tab(ibis::tablex::create()); initColumns(*tab, val); tab->reserveSpace(gParam.nrpd); const uint32_t cap = tab->capacity(); const uint32_t printblock = ibis::util::compactValue(cap*0.1, cap*0.5); while (irow < gParam.maxrow) { std::string dir; ipart = pCounter(); LOGGER(ibis::gVerbose > 2) << "starting to work on partition # " << ipart; if (ipart > 1) { // sleep a random seconds (void) sleep((int)(ipart * ibis::util::rand() + 0.4)); } { // generate a new directory name const char* str = strrchr(gParam.droot, '/'); if (str != 0) { ++ str; } else { str = gParam.droot; } std::ostringstream oss; oss << gParam.droot << FASTBIT_DIRSEP << str << std::hex << std::setprecision(gParam.ndigits) << std::setw(gParam.ndigits) << std::setfill('0') << ipart; dir = oss.str(); } ibis::util::timer mytimer(dir.c_str(), 1); while (tab->mRows() < cap && (irow = fillRow(val)) < gParam.maxrow) { // loop to generate the actual values LOGGER(irow % printblock == 0) << " . " << irow; ierr = tab->appendRow(val); LOGGER(ierr != gParam.totcols && ibis::gVerbose >= 0) << "Warning -- failed to add values of row " << irow << " to the in-memory table for " << dir << ", appendRow returned " << ierr; } if (tab->mRows() > 0) { // write the entries to dir ierr = tab->write(dir.c_str()); if (ierr < 0) { LOGGER(ibis::gVerbose > 0) << "Warning -- failed to write " << tab->mRows() << " rows to directory " << dir << ", the function write returned " << ierr; } else { LOGGER(ibis::gVerbose > 1) << "Completed writing " << tab->mRows() << " rows to directory " << dir; } tab->clearData(); } { ibis::part mypart(dir.c_str(), (const char*)0, true); if (mypart.nRows() > 0 && mypart.nColumns() > 0) { ierr = mypart.reorder(); LOGGER(ibis::gVerbose > 0) << "Reordering " << dir << " returned " << ierr; } } } return (void*)0; } } int main(int argc, char **argv) { unsigned nthr = 3; int ierr, nparts; const char *cf = 0; std::vector<pthread_t> tid; pthread_attr_t attr; /* get the number of rows to generate */ if (argc < 3) { fprintf(stderr, "Usage: setqgen <fastbit-data-dir> <#rows> [<#rows-per-dir>] \n" "\tIf the third argument is not provided, this program will " "put 10 millions rows in a directory\n"); return -1; } if (argc > 5) { addBlobs = true; for (int j = 4; cf == 0 && j < argc; ++ j) { if (isdigit(*argv[j]) == 0) cf = argv[j]; } } else if (argc == 5) { if (isdigit(*argv[4]) == 0) cf = argv[4]; else addBlobs = true; } ibis::gVerbose = 5; ibis::init(cf); // initialize the file manager gParam.maxrow = strtod(argv[2], 0); if (gParam.maxrow <= 0) { // determine the number of rows based on cache size gParam.maxrow = ibis::fileManager::currentCacheSize(); // the queries in doTest of thula.cpp needs 4 sets of doubles, wich // amounts to 32 bytes per row, the Set Query Benchmark data takes // 28 bytes per row, the choice below allows all intermediate // results to fit in the memory cache gParam.maxrow = ibis::util::compactValue(gParam.maxrow / 80.0, gParam.maxrow / 60.0); } if (gParam.maxrow < 10) /* generate at least 10 rows */ gParam.maxrow = 10; if (argc > 3) { gParam.nrpd = strtod(argv[3], 0); if (gParam.nrpd < 2) gParam.nrpd = ibis::util::compactValue(gParam.maxrow / 10.0, 1e7); } else { gParam.nrpd = (gParam.maxrow > 10000000 ? 10000000 : gParam.maxrow); } std::cout << argv[0] << " " << argv[1] << " " << gParam.maxrow << " " << gParam.nrpd << std::endl; gParam.totcols = NUMCOLS+1; if (addBlobs) { std::cout << "with an additional blob column named " << colname[13] << std::endl; ++ gParam.totcols; } nparts = gParam.maxrow / gParam.nrpd; nparts += (gParam.maxrow > nparts*gParam.nrpd); if (nparts < nthr) nparts = nthr; ierr = nparts; for (gParam.ndigits = 1, ierr >>= 4; ierr > 0; ierr >>= 4, ++ gParam.ndigits); gParam.droot = argv[1]; if (nthr > 1) { -- nthr; } else { nthr = 0; } tid.resize(nthr); ierr = pthread_attr_init(&attr); if (ierr == 0) { #if defined(PTHREAD_SCOPE_SYSTEM) ierr = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); if (ierr != 0 #if defined(ENOTSUP) && ierr != ENOTSUP #endif ) { LOGGER(ibis::gVerbose > 1) << "Warning -- " << *argv << " pthread_attr_setscope failed " "to set system scope (ierr = " << ierr << ')'; } #endif for (long i = 0; i < nthr; ++i) { ierr = pthread_create(&(tid[i]), &attr, thrfun, (void*)0); LOGGER(0 != ierr && ibis::gVerbose > 0) << "Warning -- " << *argv << " failed to start the thread # " << i << " to run thrfun (" << strerror(ierr) << ')'; } } else { LOGGER(ibis::gVerbose > 2) << *argv << " -- pthread_attr_init failed with " << ierr << ", using default attributes"; for (unsigned i = 0; i < nthr; ++i) { ierr = pthread_create(&(tid[i]), 0, thrfun, (void*)0); LOGGER(0 != ierr && ibis::gVerbose > 0) << "Warning -- " << *argv << " failed to start the thread # " << i << " to run ibis_part_build_index (" << strerror(ierr) << ')'; } } (void) thrfun((void*)0); for (unsigned i = 0; i < nthr; ++ i) { void *j; pthread_join(tid[i], &j); LOGGER(j != 0 && ibis::gVerbose > 0) << "Warning -- " << *argv << " -- thread # " << i << " returned a nonzero code " << j; } return 0; } /* main */
_______________________________________________ FastBit-users mailing list [email protected] https://hpcrdm.lbl.gov/cgi-bin/mailman/listinfo/fastbit-users
