Hi.

This is my company so I reveal what I like, even though the board would
shoot me but hey do you think they are scanning this mailinglist ? :)

The PR algo is very simple (but clever) and can be found on wikipedia:
http://en.wikipedia.org/wiki/PageRank
What is painful is to calculate it in a distributed architecture. You will
never achieve your goal by using a DB to store the score/node and links
from/to it (we did not at least).
We use plain lucene indexes and 10 memcached servers to store the
intermediate scoring and run enough iterations for the scoring to almost
converge (it never converges completely).

Here guys, a piece of code which I leave for you to examine, me might do it
totally wrong.

//Based on: http://en.wikipedia.org/wiki/PageRank
//called from a MR job..
    public void calculatePageRank(FeedScore feedScore) throws Exception
    {
        long source = feedScore.getFeedId();
        float d = 0.85f;
        float prSum = 0;
        TopDocs links = this.findByTarget(source);
        if(links != null && links.scoreDocs.length > 0)
        {
            int inlinks = links.scoreDocs.length;
            if(inlinks != feedScore.getInlinks())
            {
                log.warn(feedScore.getInlinks() + " vs " + inlinks + "
inlinks for "+feedScore.getFeedId() + " => the index is corrupt!");
            }
            for(int i = 0 ; i < inlinks; i++)
            {
                int docId = links.scoreDocs[i].doc;
                Document doc = feedLinkIndexSearcher.doc(docId);
                Long S = Long.valueOf(doc.get("source"));
                FeedScore fs = this.feedScoreMap.get(S);
                if(fs != null)
                {
                    float outlinks = fs.getOutlinks();
                    if(outlinks > 0)
                    {
                        float score = fs.getScore();
                        float pr = score/outlinks;
                        prSum += pr;
                    }
                }
                else
                {
                    log.warn("No FeedScore for "+S + ", the index seem
corrupt");
                }
            }
        }
        else if(feedScore.getInlinks() > 0)
        {
            log.warn("No inlinks for "+source +", the index seem corrupt");
        }

        FeedScore virtualFeedScore = this.feedScoreMap.get(new Long(0));
        float virtualPageRank = virtualFeedScore.getScore();
        //extra to compensate for feeds without outlinks
        if(feedScore.getOutlinks() == 0)
        {
            prSum += (virtualPageRank - feedScore.getScore())/(n-1);
        }
        else
        {
            prSum += virtualPageRank/(n-1);
        }


        float pr = ((1-d)/n)+(d*prSum);

        if(feedScore.getOutlinks() == 0)
        {
            virtualPageRank += pr - feedScore.getScore();
            virtualFeedScore.setScore(virtualPageRank);
            feedScoreMap.put(new Long((long)0), virtualFeedScore);
        }

        feedScore.setScore(pr);
        feedScore.setNextCalcDate(nextCalcDate);
        feedScoreMap.put(feedScore.getFeedId(), feedScore);
        this.prSum += pr;
    }


Ted:
Don't take it personally that I hammered your baby and don't take me wrong.
I DO like HBase but being a java app it consumed a lot of resources and gave
me too little back last year when I did my tests.
I think HBase is definitely the right way to go (can Google be wrong ?) but
at the time it was not snappy enough and perhaps my head was not really
tuned in on hadoop at the time.

I agree that traditional DB solutions with scaling UP instead of OUT will
not be doable in the long run so that is exactly what we are doing, adding
the overhead of knowing what lies where on a master server and yes soon we
need to shard the master itself haha.

Now I want to ask you a question: What hardware would you use for storing a
10-100 million blogs and 1-10 billion blog entries and make each findable
within let's say 100 msec ? I am curious since what all comes down to is
money, money to invest into the right hardware to get the most for bang for
the buck.
What is mostly needed for HBase to scale ?
Memory ?
Total amount of HDFS IO ?
CPU ?
To little memory then I guess the load go IO-bound ?

Perhaps we should start a separate thread about that..

Cheers

//Marcus










On Thu, Jul 2, 2009 at 10:34 PM, Mark Kerzner <[email protected]> wrote:

> That's awesome information, Marcus.
> I am working on a project which would require a similar architectural
> solution (although unlike you I can't broadcast the details), so that was
> very useful. One thing I can say though is that mine is in no way a
> competitor, being in a different area.
>
> If I could find out more - would be even better. For example, how do you do
> Page Rank. Although I think that I have seen PageRank algorithm in MR
> somewhere (Google actually playfully revealing the secret), and surely
> Pregel promises this code in 15 lines.
>
> Cheers,
> Mark
>
> On Thu, Jul 2, 2009 at 3:09 PM, Marcus Herou <[email protected]
> >wrote:
>
> > Hi.
> >
> > Guys whoa slow down! We are on the same side and agree on most things :)
> >
> > We are using hadoop in these cases.
> >
> > BlogSearch:
> > 1. Prospect Crawling: A job exports a few hundred thousand urls from a db
> > into a flat file and then lets a threaded MapRunnable go crazy crawling
> on
> > that one and save the fetched data and response to glusterFS during
> > runtime.
> > In the end of each url's crawl the state is then moved from "fetchable"
> to
> > "parsable" and mapped back into the db. This is done during runtime but
> > should be done afterwards.
> > 2. Prospect Parsing: Again export the urls from the db in question into
> > hadoop where we process them and perform ngramanalysis, spam detection,
> > blog
> > detection etc and if everyting goes well marks the url as a valid blog
> and
> > updates the state accordingly, again during runtime, should be done
> > afterwards.
> > 3. Blog Crawling: Fetch the feed urls, basically same as case1 but here
> we
> > fetch the feed urls instead.
> > 4. Blog Parsing: Parse the feed url into a SyndFeed object (rome project)
> > and process the SyndFeed and all it's SyndEntries and persist them into a
> > DB
> > during runtime. This could be improved by emitting the entries instead of
> > saving the to a DB runtime. Each new entry notifies a indexer Q.
> > 5. Indexing: The indexer polls the Q, loads the id's into a threaded
> > MapRunnable and go crazy updating SOLR.
> >
> > It is basically an extension and refinement of Nutch where we have
> refined
> > the plugin architecture to use Spring, created wrappers around some
> > existing
> > plugins and chnage the storage solution from the CrawlDB and LinkDB to
> use
> > a
> > database instead.
> >
> > Log file analysis:
> > We are amongb things a blog advertising company so statistics is key. We
> > use
> > Hadoop for 95% of the processing of the incoming access logs, well not
> > access logs but very alike and then emits the data into MonetDB which
> have
> > great speed (but bad robustness) for grouping, counting etc.
> >
> > The end users are then using SOLR for searching and our sharded DB
> solution
> > to fetch the item in question, the blog entry, the blog itself and so on.
> > Perhaps we will use HBase but after testing last year I ruled HBase out
> due
> > to performance since I got more bang for the buck writing my own arch,
> > sorry
> > Stack...
> >
> > The architecture is quite good but just needs some tuning which you
> pointed
> > out. Whenever you can emit a record to the outputCollector instead of
> > updating the DB then you should.
> >
> > We now have 90 million blog entries in the DB (in 4 months) and have
> > prospected over a billion urls so we are doing something right I hope.
> >
> > The originating question seem to have got out of scope haha.
> >
> > Cheers lads
> >
> > //Marcus
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Jul 2, 2009 at 5:08 PM, Uri Shani <[email protected]> wrote:
> >
> > > Hi Marcus,
> > > If you need a database to serve as an in-between to your customer, than
> > > you can do massive load to DB of the end results - as I suggested.
> > > You can avoid that and work with files only. Than you need to serve
> your
> > > customers via an API to your hdfs files. Each query will start a M/R
> job
> > > to get the results.
> > > Don't know the response time - yet it is a viable approach to search.
> > > There are also the hadoop DBs: Hbase and Hive. PIG may also have SQL
> like
> > > things in the future, and there is the JAQL project
> http://www.jaql.org/
> > .
> > >
> > > - Uri
> > >
> > >
> > >
> > > From:
> > > Marcus Herou <[email protected]>
> > > To:
> > > [email protected]
> > > Date:
> > > 02/07/2009 05:50 PM
> > > Subject:
> > > Re: Parallell maps
> > >
> > >
> > >
> > > Hi.
> > >
> > > Yep I recon this. As I said, I have created a sharded DB solution where
> > > all
> > > data is currently spread across 50 shards. I totally agree that one
> > should
> > > try to emit data to the outputCollector but when one have the data in
> > > memory
> > > I do not want to throw it away and re-read it into memory again later
> on
> > > after the job is done by chained Hadoop jobs... In our stats system we
> do
> > > exactly this, almost no db all over the complex chain, just reducing
> and
> > > counting like crazy :)
> > >
> > > There is another pro following your example and that is that you risk
> > less
> > > during the actual job since you do not have to create a db-pool of
> > > thousands
> > > of connections. And the jobs do not affect the live production DB =
> good.
> > >
> > > Anyway what do you mean that if you HAVE to load it into a db ? How the
> > > heck
> > > are my users gonna access our search engine without accessing a
> populated
> > > index or DB ?
> > > I've been thinking about this in many use-cases and perhaps I am
> thinking
> > > totally wrong but I tend to not be able to implement a shared-nothing
> > arch
> > > all over the chain it is simply not possible. Sometimes you need a DB,
> > > sometimes you need an index, sometimes a distributed cache, sometimes a
> > > file
> > > on local FS/NFS/glusterFS (yes I prefer classical mount points over
> HDFS
> > > due
> > > to the non wrapper characteristict of HDFS and it's speed, random
> access
> > > IO
> > > that is).
> > >
> > > I will put a mental note to adapt and be more hadoopish :)
> > >
> > > Cheers
> > >
> > > //Marcus
> > >
> > > On Thu, Jul 2, 2009 at 4:27 PM, Uri Shani <[email protected]> wrote:
> > >
> > > > Hi,
> > > > Whenever you try to access DB in parallel you need to design it for
> > > that.
> > > > This means, for instance, that you ensure that each of the parallel
> > > tasks
> > > > inserts to a distinct "partition" or table in the database to avoid
> the
> > > > conflicts and failures. Hadoop does the same in its reduce tasks -
> each
> > > > reduce gets ALL the records that are needed to do its function. So
> > there
> > > > is a hash mapping keys to these tasks. Same principle you need to
> > follow
> > > > when linking DB partitions to your hadoop tasks.
> > > >
> > > > In general, think how to not use DB inserts from Hadoop. Rather,
> create
> > > > your results in files.
> > > > At the end of the process, you can - if this is what you HAVE to do -
> > > > massively load all the records into the database using efficient
> > loading
> > > > utilities.
> > > >
> > > > If you need the DB to communicate among your tasks, meaning that you
> > > need
> > > > the inserts to be readily available for other threads to select, than
> > it
> > > > is obviously the wrong media for such sharing and you need to look at
> > > > other solutions to share consistent data among hadoop tasks. For
> > > instance,
> > > > zookeeper, etc.
> > > >
> > > > Regards,
> > > > - Uri
> > > >
> > > >
> > > >
> > > > From:
> > > > Marcus Herou <[email protected]>
> > > > To:
> > > > common-user <[email protected]>
> > > > Date:
> > > > 02/07/2009 12:13 PM
> > > > Subject:
> > > > Parallell maps
> > > >
> > > >
> > > >
> > > > Hi.
> > > >
> > > > I've noticed that hadoop spawns parallell copies of the same task on
> > > > different hosts. I've understood that this is due to improve the
> > > > performance
> > > > of the job by prioritizing fast running tasks. However since we in
> our
> > > > jobs
> > > > connect to databases this leads to conflicts when inserting,
> updating,
> > > > deleting data (duplicated key etc). Yes I know I should consider
> Hadoop
> > > as
> > > > a
> > > > "Shared Nothing" architecture but I really must connect to databases
> in
> > > > the
> > > > jobs. I've created a sharded DB solution which scales as well or I
> > would
> > > > be
> > > > doomed...
> > > >
> > > > Any hints of how to disable this feature or howto reduce the impact
> of
> > > it
> > > > ?
> > > >
> > > > Cheers
> > > >
> > > > /Marcus
> > > >
> > > > --
> > > > Marcus Herou CTO and co-founder Tailsweep AB
> > > > +46702561312
> > > > [email protected]
> > > > http://www.tailsweep.com/
> > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Marcus Herou CTO and co-founder Tailsweep AB
> > > +46702561312
> > > [email protected]
> > > http://www.tailsweep.com/
> > >
> > >
> > >
> >
> >
> > --
> > Marcus Herou CTO and co-founder Tailsweep AB
> > +46702561312
> > [email protected]
> > http://www.tailsweep.com/
> >
>



-- 
Marcus Herou CTO and co-founder Tailsweep AB
+46702561312
[email protected]
http://www.tailsweep.com/

Reply via email to