Hi, Petr,

Attached is a modification of the file tests/setqgen.cpp to mimic your
use case.  So far, it seems to produce exactly the same output set (as
a whole, not in the individual partitions) as produced by
tests/setqgen.cpp.  It works OK on my laptop.  Would you mind take a
look and see if you can get it to behave more like what your program
does?

Thanks.

John


On Thu, Aug 30, 2012 at 11:45 PM, Petr Velan <[email protected]> wrote:
> Hi John,
>
> The memory management in FastBit is good for batch mode of operation,
> however I need to reduce the memory footprint, since there might be
> some other operations that might require large amount of memory for
> the short time, so that the FastBit cannot have it allocated all the
> time.
>
> I know that there is a limit that allows to use maximum of half of
> available memory and that it can be changed. I think it would be a
> good thing to have two limits. One to set maximum that can be used and
> other that would trigger the unload function. The result would be that
> I would set FastBit to have 0.5GB at ready and allow it to expand to
> 10GB. So when needed, the fastbit would use up to 10GB of memory, but
> after that it would would free it and keep only 0.5GB for further use.
> The default could still be to have both limits at half of available
> memory. What do you think?
>
> We are currently trying to manually call
> ibis::fileManager::instance().flushDir() to see if it helps to keep
> the memory down, but I believe that the solution I described earlier
> is much more generic.
>
> Unfortunately, I do not have any simple code to reproduce the
> deadlock. I'll try to look into it, maybe compile FastBit with
> debugging symbols to help us better understand what is  going on.
>
> Petr
>
> On 30 August 2012 20:29, K. John Wu <[email protected]> wrote:
>> Hi, Petr,
>>
>> Thanks for clarifying the use case.  Looks like you can not wait for
>> everything to be done before releasing the ibis::part objects.
>> Regarding the memory usage, FastBit does lazy deletions - as long as
>> no one needs new memory, the existing content read from files will be
>> kept in memory.  The default maximum memory to be used is a half of
>> the physical memory - which explains what you've observed.  Once
>> reaching that limit, ibis::fileManager::unload will be called to
>> remove the content of files that are no in active use.  In your case,
>> it sounds like there will be a lot of old files to be removed from memory.
>>
>> Since there is no clear indication which thread is holding on to the
>> mutex lock, we might need to create a multithreaded data generator
>> that can mimic your data ingestion process.  If you have simple one
>> that I can borrow, I would greatly appreciate it.
>>
>> Most likely, another copy of ibis::fileManager::getFile is holding on
>> to the ibis::fileManager::mutex.  However, logically, that is not
>> possible because that thread can only be waiting on a conditional
>> variable in which case it should have yield the mutex lock already.
>> Anyway, something gnarly is going on here..
>>
>> John
>>
>>
>> On 8/29/12 10:54 PM, Petr Velan wrote:
>>> Hi John,
>>>
>>> I still do not understand why there is a deadlock, or why is the
>>> access to different partitions managed by same mutex lock.
>>>
>>> Our use case is this:
>>> We have a process that collects data from network and stores them to
>>> fastbit partitions. Each partition contains 5 minutes of data,
>>> approximately 300-400MB. After 5 minutes expire, new thread is
>>> launched that creates ibis::part, runs reorder, deletes the part,
>>> creates ibis::table which is used to create indexes and then deletes
>>> the table. After that the thread ends.
>>>
>>> Since there is data from multiple sources, there are multiple threads
>>> that store the data and reorder/index it.
>>>
>>> What is bothering me are two things:
>>> The deadlock, since the mutex should only synchronize, I wonder who
>>> really holds the lock when both threads are waiting for it.
>>> Second is that the memory used by the process constantly grows. After
>>> the parts and tables are deleted, I would expect the memory to be
>>> released as well, since for next 5 minutes, it will not be needed.
>>> Unfortunately, FastBit does not free the memory until it reaches 50%
>>> of total memory, which in our case is 6GB. That is kind of
>>> unfortunate, since what it should really need is about 1GB of memory
>>> for reorder in the worst case and then the memory should be free to
>>> use by other processes. Is there any way to achieve this? The memory
>>> is consumed even without the reordering, only when building indexes.
>>>
>>> Thank you for the warning about strings, we plan to use them in
>>> future, so we will have to without the reorder in that case.
>>>
>>> Petr
>>>
>>> On 29 August 2012 18:28, K. John Wu <[email protected]> wrote:
>>>> Hi, Petr,
>>>>
>>>> From the stack traces, look like one thread is trying to free a data
>>>> partition object while another one is trying to reorder the rows of
>>>> presumably another data partition.  The first mutex lock is invoked
>>>> from the constructor of a storage object (ibis::fileManager::storage).
>>>>  This is invoked because the amount of data in memory (tracked by the
>>>> file manager) is close to the prescribed maximum (maxBytes).  The
>>>> second mutex lock is invoked from a function called
>>>> ibis::fileManager::removeCleaner (which is invoked by the destructor
>>>> of an ibis::part object).
>>>>
>>>> Running out memory seems to be the fundamental problem here.
>>>> Presumably, you only need to do reordering once and your datasets are
>>>> quite large.  I would suggest that you use only a single thread to
>>>> reorder your data - this way all the memory will devoted to a single
>>>> reordering operation.
>>>>
>>>> If you really do have a lot of memory (or each data partition is
>>>> relatively small) and want to do the reordering with multiple threads,
>>>> then delay the operation of freeing the ibis::part objects until you
>>>> are done with all reordering operations.  The cleaner objects from
>>>> each data partition will make sure each ibis::part object is taking
>>>> only a minimal amount of memory.
>>>>
>>>> A note of warning, the current code only sort the numerical values,
>>>> any strings or blobs will be left untouched.  If your datasets have
>>>> strings or blobs, your datasets will not be coherent after calling the
>>>> function reorder!
>>>>
>>>> John
>>>>
>>>>
>>>> On 8/29/12 4:57 AM, Petr Velan wrote:
>>>>> Hi John,
>>>>>
>>>>> thank you for all the work that you put into the FastBit library, it
>>>>> allows us to achieve great results!
>>>>>
>>>>> I've bumped into a little bug which might be very hard to reproduce or
>>>>> identify. I'm using two thread to reorder and index data that are
>>>>> already stored on disk. It was ok for a little while, but then it
>>>>> stuck in deadlock. Here are gdb traces from both threads,
>>>>> unfortunately without debugging symbols, so that the specific files
>>>>> and lines are unknown.
>>>>>
>>>>> We are currently using the SVN version 532.
>>>>>
>>>>> (gdb) bt
>>>>> #0  0x00007f8983463054 in __lll_lock_wait () from /lib64/libpthread.so.0
>>>>> #1  0x00007f898345e388 in _L_lock_854 () from /lib64/libpthread.so.0
>>>>> #2  0x00007f898345e257 in pthread_mutex_lock () from 
>>>>> /lib64/libpthread.so.0
>>>>> #3  0x00007f898271e074 in ibis::fileManager::storage::storage(unsigned
>>>>> long) () from /usr/lib64/libfastbit.so.0
>>>>> #4  0x00007f898271eb16 in ibis::fileManager::storage::enlarge(unsigned
>>>>> long) () from /usr/lib64/libfastbit.so.0
>>>>> #5  0x00007f898272214f in ibis::fileManager::roFile::doRead(char
>>>>> const*) () from /usr/lib64/libfastbit.so.0
>>>>> #6  0x00007f8982723b4b in ibis::fileManager::getFile(char const*,
>>>>> ibis::fileManager::storage**, ibis::fileManager::ACCESS_PREFERENCE) ()
>>>>> from /usr/lib64/libfastbit.so.0
>>>>> #7  0x00007f898273406a in int ibis::fileManager::getFile<unsigned
>>>>> short>(char const*, ibis::array_t<unsigned short>&,
>>>>> ibis::fileManager::ACCESS_PREFERENCE) () from
>>>>> /usr/lib64/libfastbit.so.0
>>>>> #8  0x00007f8981f9f4a5 in ibis::column::actualMinMax(char const*,
>>>>> ibis::bitvector const&, double&, double&) const () from
>>>>> /usr/lib64/libfastbit.so.0
>>>>> #9  0x00007f8981fa3546 in ibis::column::computeMinMax() () from
>>>>> /usr/lib64/libfastbit.so.0
>>>>> #10 0x00007f89827beae6 in
>>>>> ibis::part::gatherSortKeys(ibis::array_t<char const*>&) () from
>>>>> /usr/lib64/libfastbit.so.0
>>>>> #11 0x00007f89827bfc56 in ibis::part::reorder() () from
>>>>> /usr/lib64/libfastbit.so.0
>>>>> #12 0x00007f8982c7e2af in reorder_index(void*) () from
>>>>> /usr/share/ipfixcol/plugins/ipfixcol-fastbit-output.so
>>>>> #13 0x00007f898345c851 in start_thread () from /lib64/libpthread.so.0
>>>>> #14 0x00007f89831aa6dd in next_line () from /lib64/libc.so.6
>>>>> #15 0x0000000000000000 in ?? ()
>>>>> (gdb)
>>>>>
>>>>>
>>>>> (gdb) bt
>>>>> #0  0x00007f8983463054 in __lll_lock_wait () from /lib64/libpthread.so.0
>>>>> #1  0x00007f898345e388 in _L_lock_854 () from /lib64/libpthread.so.0
>>>>> #2  0x00007f898345e257 in pthread_mutex_lock () from 
>>>>> /lib64/libpthread.so.0
>>>>> #3  0x00007f898175a6aa in
>>>>> ibis::util::mutexLock::mutexLock(pthread_mutex_t*, char const*) ()
>>>>> from /usr/lib64/libfastbit.so.0
>>>>> #4  0x00007f89827177d4 in
>>>>> ibis::fileManager::removeCleaner(ibis::fileManager::cleaner const*) ()
>>>>> from /usr/lib64/libfastbit.so.0
>>>>> #5  0x00007f8981735952 in ibis::part::~part() () from 
>>>>> /usr/lib64/libfastbit.so.0
>>>>> #6  0x00007f8981735c29 in ibis::part::~part() () from 
>>>>> /usr/lib64/libfastbit.so.0
>>>>> #7  0x00007f8982c7e2cd in reorder_index(void*) () from
>>>>> /usr/share/ipfixcol/plugins/ipfixcol-fastbit-output.so
>>>>> #8  0x00007f898345c851 in start_thread () from /lib64/libpthread.so.0
>>>>> #9  0x00007f89831aa6dd in next_line () from /lib64/libc.so.6
>>>>> #10 0x0000000000000000 in ?? ()
>>>>> (gdb)
>>>>>
>>>>>
>>>>> Do you have any idea what might be going on?
>>>>>
>>>>> With regards,
>>>>> Petr Velan
>>>>>
/***************************************************************
 *
 * pargen - data generator for the set query benchmark
 * <http://www.cs.umb.edu/~poneil/SetQBM.pdf>
 *
 * usage: setqgen <root-data-dir> <#rows> <#rows-per-dir>
 *
 * It generates the data as raw binary integers in the format that can be
 * directly used by FastBit functions.  If more than one directory is
 * needed, it will generate a set of subdirectories in the root-data-dir
 * with the names that are a concatenation of the root-data-dir name and
 * hexadecimal version of the partition number.
 *
 * Optional 4th and 5th arguments could be given.  When only four arguments
 * are given, if it starts with a decimal digit it program writes out an
 * extra column of blobs to test the new blob support, otherwise, the 4th
 * argument is taken as a configuration file name to be passed to the
 * initialization function for FastBit.  When five arguments are given, the
 * 4th argument is taken to be the configuration file controlling the
 * initialization of FastBit, and the presence of the 5th parameter inform
 * this program to test the new blob feature.
 ***************************************************************/ 
#include <ibis.h>	/* ibis::init */
#include <string.h>	/* strrchr */
#include <stdlib.h>
#include <stdio.h>
#include <cmath>	/* fmod, ceil */
#include <iomanip>	/* setprecision, setfill */
#include <memory>	/* std::auto_ptr */

/* number of numeric columns to generate data for SETQ is always 13, with
   one as a sequence number */
#define NUMCOLS 12

/** column cardinalities:  # distinct values*/
const int colcard[]={2,4,5,10,25,100,1000,10000,40000,100000,250000,500000};
/** column names */
const char *colname[]={"K2", "K4", "K5", "K10", "K25", "K100", "K1K", "K10K",
		       "K40K", "K100K", "K250K", "K500K", "KSEQ", "V"};

static bool addBlobs = false;

/* constants for random numbers */
const static double SETQRAND_MODULUS = 2147483647.0;
const static double SETQRAND_MULTIPLIER = 16807.0;
/* the actual random value */
static double SETQRAND_seed = 1.0;
/** A simple random number generator.  It generates the same sequence of
    numbers each time. */
inline int setqrand(void) {
    SETQRAND_seed = std::fmod(SETQRAND_MULTIPLIER*SETQRAND_seed,
			      SETQRAND_MODULUS);
    return (int)SETQRAND_seed;
} /* setqrand */

/// Fill a row of the data table.  This function uses a mutex lock so that
/// the data generated from multiple threads will be the same.
static uint64_t fillRow(ibis::table::row& val) {
    static uint64_t seq = 0;
    ibis::util::quietLock lock(&ibis::util::envLock);
    ++ seq;
    val.uintsvalues[3]   = (uint32_t)seq;
    val.uintsvalues[2]   = (setqrand() % colcard[11]) + 1;
    val.uintsvalues[1]   = (setqrand() % colcard[10]) + 1;
    val.uintsvalues[0]   = (setqrand() % colcard[9]) + 1;
    val.ushortsvalues[2] = (setqrand() % colcard[8]) + 1;
    val.ushortsvalues[1] = (setqrand() % colcard[7]) + 1;
    val.ushortsvalues[0] = (setqrand() % colcard[6]) + 1;
    val.ubytesvalues[5]  = (setqrand() % colcard[5]) + 1;
    val.ubytesvalues[4]  = (setqrand() % colcard[4]) + 1;
    val.ubytesvalues[3]  = (setqrand() % colcard[3]) + 1;
    val.ubytesvalues[2]  = (setqrand() % colcard[2]) + 1;
    val.ubytesvalues[1]  = (setqrand() % colcard[1]) + 1;
    val.ubytesvalues[0]  = (setqrand() % colcard[0]) + 1;
    if (addBlobs) { // construct a raw string object from the character table
	unsigned sz = (unsigned)(ibis::util::rand()*65.0);
	val.blobsvalues[0].copy(ibis::util::charTable, sz);
	char *str = const_cast<char*>(val.blobsvalues[0].address());
	size_t j = (size_t)(ibis::util::rand() * sz);
	while (j < sz) { // add some holes
	    str[j] = 0;
	    j += 3U + (size_t)(ibis::util::rand() * sz);
	}
    }
    return seq;
} // fillRow

static void initColumns(ibis::tablex& tab, ibis::table::row& val) {
    tab.addColumn(colname[0], ibis::UBYTE);
    tab.addColumn(colname[1], ibis::UBYTE);
    tab.addColumn(colname[2], ibis::UBYTE);
    tab.addColumn(colname[3], ibis::UBYTE);
    tab.addColumn(colname[4], ibis::UBYTE);
    tab.addColumn(colname[5], ibis::UBYTE);
    tab.addColumn(colname[6], ibis::USHORT);
    tab.addColumn(colname[7], ibis::USHORT);
    tab.addColumn(colname[8], ibis::USHORT);
    tab.addColumn(colname[9], ibis::UINT);
    tab.addColumn(colname[10], ibis::UINT);
    tab.addColumn(colname[11], ibis::UINT);
    tab.addColumn(colname[12], ibis::UINT, colname[12],
		  "<binning precsion=2 reorder/><encoding equality/>");

    val.clear();
    val.ubytesnames.resize(6);
    val.ubytesvalues.resize(6);
    val.ushortsnames.resize(3);
    val.ushortsvalues.resize(3);
    val.uintsnames.resize(4);
    val.uintsvalues.resize(4);

    if (addBlobs) {
	tab.addColumn(colname[13], ibis::BLOB, "opaque values");
	val.blobsnames.resize(1);
	val.blobsvalues.resize(1);
    }
} // initColumns

struct parameters {
    const char* droot;
    int64_t maxrow, nrpd;
    int ndigits, totcols;
};
static parameters gParam;
static ibis::util::counter pCounter;

extern "C" {
    static void* thrfun(void*) {
	long ierr;
	uint64_t irow=0;
	uint32_t ipart=0;
	ibis::table::row val;
	std::auto_ptr<ibis::tablex> tab(ibis::tablex::create());
	initColumns(*tab, val);
	tab->reserveSpace(gParam.nrpd);
	const uint32_t cap = tab->capacity();
	const uint32_t printblock = ibis::util::compactValue(cap*0.1, cap*0.5);

	while (irow < gParam.maxrow) {
	    std::string dir;
	    ipart = pCounter();
	    LOGGER(ibis::gVerbose > 2)
		<< "starting to work on partition # " << ipart;
	    if (ipart > 1) { // sleep a random seconds
		(void) sleep((int)(ipart * ibis::util::rand() + 0.4));
	    }
	    { // generate a new directory name
		const char* str = strrchr(gParam.droot, '/');
		if (str != 0) {
		    ++ str;
		}
		else {
		    str = gParam.droot;
		}
		std::ostringstream oss;
		oss << gParam.droot << FASTBIT_DIRSEP << str << std::hex
		    << std::setprecision(gParam.ndigits)
		    << std::setw(gParam.ndigits)
		    << std::setfill('0') << ipart;
		dir = oss.str();
	    }

	    ibis::util::timer mytimer(dir.c_str(), 1);
	    while (tab->mRows() < cap &&
		   (irow = fillRow(val)) < gParam.maxrow) {
		// loop to generate the actual values
		LOGGER(irow % printblock == 0) << " . " << irow;

		ierr = tab->appendRow(val);
		LOGGER(ierr != gParam.totcols && ibis::gVerbose >= 0)
		    << "Warning -- failed to add values of row "
		    << irow << " to the in-memory table for " << dir
		    << ", appendRow returned " << ierr;
	    }

	    if (tab->mRows() > 0) { // write the entries to dir
		ierr = tab->write(dir.c_str());
		if (ierr < 0) {
		    LOGGER(ibis::gVerbose > 0)
			<< "Warning -- failed to write " << tab->mRows()
			<< " rows to directory " << dir
			<< ", the function write returned " << ierr;
		}
		else {
		    LOGGER(ibis::gVerbose > 1)
			<< "Completed writing " << tab->mRows()
			<< " rows to directory " << dir;
		}

		tab->clearData();
	    }

	    {
		ibis::part mypart(dir.c_str(), (const char*)0, true);
		if (mypart.nRows() > 0 && mypart.nColumns() > 0) {
		    ierr = mypart.reorder();
		    LOGGER(ibis::gVerbose > 0)
			<< "Reordering " << dir << " returned " << ierr;
		}
	    }
	}

	return (void*)0;
    }
}

int main(int argc, char **argv) {
    unsigned nthr = 3;
    int ierr, nparts;
    const char *cf = 0;
    std::vector<pthread_t> tid;
    pthread_attr_t attr;

    /* get the number of rows to generate */
    if (argc < 3) {
	fprintf(stderr,
		"Usage: setqgen <fastbit-data-dir> <#rows> [<#rows-per-dir>] \n"
		"\tIf the third argument is not provided, this program will "
		"put 10 millions rows in a directory\n");
	return -1;
    }

    if (argc > 5) {
	addBlobs = true;
	for (int j = 4; cf == 0 && j < argc; ++ j) {
	    if (isdigit(*argv[j]) == 0)
		cf = argv[j];
	}
    }
    else if (argc == 5) {
	if (isdigit(*argv[4]) == 0)
	    cf = argv[4];
	else
	    addBlobs = true;
    }

    ibis::gVerbose = 5;
    ibis::init(cf); // initialize the file manager
    gParam.maxrow = strtod(argv[2], 0);
    if (gParam.maxrow <= 0) { // determine the number of rows based on cache size
	gParam.maxrow = ibis::fileManager::currentCacheSize();
	// the queries in doTest of thula.cpp needs 4 sets of doubles, wich
	// amounts to 32 bytes per row, the Set Query Benchmark data takes
	// 28 bytes per row, the choice below allows all intermediate
	// results to fit in the memory cache
	gParam.maxrow = ibis::util::compactValue(gParam.maxrow / 80.0,
						 gParam.maxrow / 60.0);
    }
    if (gParam.maxrow < 10) /* generate at least 10 rows */
	gParam.maxrow = 10;
    if (argc > 3) {
	gParam.nrpd = strtod(argv[3], 0);
	if (gParam.nrpd < 2)
	    gParam.nrpd = ibis::util::compactValue(gParam.maxrow / 10.0, 1e7);
    }
    else {
	gParam.nrpd = (gParam.maxrow > 10000000 ? 10000000 : gParam.maxrow);
    }
    std::cout << argv[0] << " " << argv[1] << " " << gParam.maxrow
	      << " " << gParam.nrpd << std::endl;

    gParam.totcols = NUMCOLS+1;
    if (addBlobs) {
	std::cout << "with an additional blob column named " << colname[13]
		  << std::endl;
	++ gParam.totcols;
    }
    nparts = gParam.maxrow / gParam.nrpd;
    nparts += (gParam.maxrow > nparts*gParam.nrpd);
    if (nparts < nthr)
	nparts = nthr;
    ierr = nparts;
    for (gParam.ndigits = 1, ierr >>= 4; ierr > 0;
	 ierr >>= 4, ++ gParam.ndigits);
    gParam.droot = argv[1];

    if (nthr > 1) {
	-- nthr;
    }
    else {
	nthr = 0;
    }
    tid.resize(nthr);
    ierr = pthread_attr_init(&attr);
    if (ierr == 0) {
#if defined(PTHREAD_SCOPE_SYSTEM)
	ierr = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
	if (ierr != 0
#if defined(ENOTSUP)
	    && ierr != ENOTSUP
#endif
	    ) {
	    LOGGER(ibis::gVerbose > 1)
		<< "Warning -- " << *argv << " pthread_attr_setscope failed "
		"to set system scope (ierr = " << ierr << ')';
	}
#endif
	for (long i = 0; i < nthr; ++i) {
	    ierr = pthread_create(&(tid[i]), &attr, thrfun, (void*)0);
	    LOGGER(0 != ierr && ibis::gVerbose > 0)
		<< "Warning -- " << *argv << " failed to start the thread # "
		<< i << " to run thrfun (" << strerror(ierr)
		<< ')';
	}
    }
    else {
	LOGGER(ibis::gVerbose > 2)
	    << *argv << " -- pthread_attr_init failed with " << ierr
	    << ", using default attributes";
	for (unsigned i = 0; i < nthr; ++i) {
	    ierr = pthread_create(&(tid[i]), 0, thrfun, (void*)0);
	    LOGGER(0 != ierr && ibis::gVerbose > 0)
		<< "Warning -- " << *argv << " failed to start the thread # "
		<< i << " to run ibis_part_build_index (" << strerror(ierr)
		<< ')';
	}
    }
    (void) thrfun((void*)0);
    for (unsigned i = 0; i < nthr; ++ i) {
	void *j;
	pthread_join(tid[i], &j);
	LOGGER(j != 0 && ibis::gVerbose > 0)
	    << "Warning -- " << *argv << " -- thread # "
	    << i << " returned a nonzero code " << j;
    }

    return 0;
} /* main */

_______________________________________________
FastBit-users mailing list
[email protected]
https://hpcrdm.lbl.gov/cgi-bin/mailman/listinfo/fastbit-users

Reply via email to