Man, scanned through the slides, looks very promising. Great work !
//Marcus On Fri, Jul 3, 2009 at 9:28 AM, Marcus Herou <[email protected]>wrote: > Hi. > > This is my company so I reveal what I like, even though the board would > shoot me but hey do you think they are scanning this mailinglist ? :) > > The PR algo is very simple (but clever) and can be found on wikipedia: > http://en.wikipedia.org/wiki/PageRank > What is painful is to calculate it in a distributed architecture. You will > never achieve your goal by using a DB to store the score/node and links > from/to it (we did not at least). > We use plain lucene indexes and 10 memcached servers to store the > intermediate scoring and run enough iterations for the scoring to almost > converge (it never converges completely). > > Here guys, a piece of code which I leave for you to examine, me might do it > totally wrong. > > //Based on: http://en.wikipedia.org/wiki/PageRank > //called from a MR job.. > public void calculatePageRank(FeedScore feedScore) throws Exception > { > long source = feedScore.getFeedId(); > float d = 0.85f; > float prSum = 0; > TopDocs links = this.findByTarget(source); > if(links != null && links.scoreDocs.length > 0) > { > int inlinks = links.scoreDocs.length; > if(inlinks != feedScore.getInlinks()) > { > log.warn(feedScore.getInlinks() + " vs " + inlinks + " > inlinks for "+feedScore.getFeedId() + " => the index is corrupt!"); > } > for(int i = 0 ; i < inlinks; i++) > { > int docId = links.scoreDocs[i].doc; > Document doc = feedLinkIndexSearcher.doc(docId); > Long S = Long.valueOf(doc.get("source")); > FeedScore fs = this.feedScoreMap.get(S); > if(fs != null) > { > float outlinks = fs.getOutlinks(); > if(outlinks > 0) > { > float score = fs.getScore(); > float pr = score/outlinks; > prSum += pr; > } > } > else > { > log.warn("No FeedScore for "+S + ", the index seem > corrupt"); > } > } > } > else if(feedScore.getInlinks() > 0) > { > log.warn("No inlinks for "+source +", the index seem corrupt"); > } > > FeedScore virtualFeedScore = this.feedScoreMap.get(new Long(0)); > float virtualPageRank = virtualFeedScore.getScore(); > //extra to compensate for feeds without outlinks > if(feedScore.getOutlinks() == 0) > { > prSum += (virtualPageRank - feedScore.getScore())/(n-1); > } > else > { > prSum += virtualPageRank/(n-1); > } > > > float pr = ((1-d)/n)+(d*prSum); > > if(feedScore.getOutlinks() == 0) > { > virtualPageRank += pr - feedScore.getScore(); > virtualFeedScore.setScore(virtualPageRank); > feedScoreMap.put(new Long((long)0), virtualFeedScore); > } > > feedScore.setScore(pr); > feedScore.setNextCalcDate(nextCalcDate); > feedScoreMap.put(feedScore.getFeedId(), feedScore); > this.prSum += pr; > } > > > Ted: > Don't take it personally that I hammered your baby and don't take me wrong. > I DO like HBase but being a java app it consumed a lot of resources and gave > me too little back last year when I did my tests. > I think HBase is definitely the right way to go (can Google be wrong ?) but > at the time it was not snappy enough and perhaps my head was not really > tuned in on hadoop at the time. > > I agree that traditional DB solutions with scaling UP instead of OUT will > not be doable in the long run so that is exactly what we are doing, adding > the overhead of knowing what lies where on a master server and yes soon we > need to shard the master itself haha. > > Now I want to ask you a question: What hardware would you use for storing a > 10-100 million blogs and 1-10 billion blog entries and make each findable > within let's say 100 msec ? I am curious since what all comes down to is > money, money to invest into the right hardware to get the most for bang for > the buck. > What is mostly needed for HBase to scale ? > Memory ? > Total amount of HDFS IO ? > CPU ? > To little memory then I guess the load go IO-bound ? > > Perhaps we should start a separate thread about that.. > > Cheers > > //Marcus > > > > > > > > > > > > On Thu, Jul 2, 2009 at 10:34 PM, Mark Kerzner <[email protected]>wrote: > >> That's awesome information, Marcus. >> I am working on a project which would require a similar architectural >> solution (although unlike you I can't broadcast the details), so that was >> very useful. One thing I can say though is that mine is in no way a >> competitor, being in a different area. >> >> If I could find out more - would be even better. For example, how do you >> do >> Page Rank. Although I think that I have seen PageRank algorithm in MR >> somewhere (Google actually playfully revealing the secret), and surely >> Pregel promises this code in 15 lines. >> >> Cheers, >> Mark >> >> On Thu, Jul 2, 2009 at 3:09 PM, Marcus Herou <[email protected] >> >wrote: >> >> > Hi. >> > >> > Guys whoa slow down! We are on the same side and agree on most things :) >> > >> > We are using hadoop in these cases. >> > >> > BlogSearch: >> > 1. Prospect Crawling: A job exports a few hundred thousand urls from a >> db >> > into a flat file and then lets a threaded MapRunnable go crazy crawling >> on >> > that one and save the fetched data and response to glusterFS during >> > runtime. >> > In the end of each url's crawl the state is then moved from "fetchable" >> to >> > "parsable" and mapped back into the db. This is done during runtime but >> > should be done afterwards. >> > 2. Prospect Parsing: Again export the urls from the db in question into >> > hadoop where we process them and perform ngramanalysis, spam detection, >> > blog >> > detection etc and if everyting goes well marks the url as a valid blog >> and >> > updates the state accordingly, again during runtime, should be done >> > afterwards. >> > 3. Blog Crawling: Fetch the feed urls, basically same as case1 but here >> we >> > fetch the feed urls instead. >> > 4. Blog Parsing: Parse the feed url into a SyndFeed object (rome >> project) >> > and process the SyndFeed and all it's SyndEntries and persist them into >> a >> > DB >> > during runtime. This could be improved by emitting the entries instead >> of >> > saving the to a DB runtime. Each new entry notifies a indexer Q. >> > 5. Indexing: The indexer polls the Q, loads the id's into a threaded >> > MapRunnable and go crazy updating SOLR. >> > >> > It is basically an extension and refinement of Nutch where we have >> refined >> > the plugin architecture to use Spring, created wrappers around some >> > existing >> > plugins and chnage the storage solution from the CrawlDB and LinkDB to >> use >> > a >> > database instead. >> > >> > Log file analysis: >> > We are amongb things a blog advertising company so statistics is key. We >> > use >> > Hadoop for 95% of the processing of the incoming access logs, well not >> > access logs but very alike and then emits the data into MonetDB which >> have >> > great speed (but bad robustness) for grouping, counting etc. >> > >> > The end users are then using SOLR for searching and our sharded DB >> solution >> > to fetch the item in question, the blog entry, the blog itself and so >> on. >> > Perhaps we will use HBase but after testing last year I ruled HBase out >> due >> > to performance since I got more bang for the buck writing my own arch, >> > sorry >> > Stack... >> > >> > The architecture is quite good but just needs some tuning which you >> pointed >> > out. Whenever you can emit a record to the outputCollector instead of >> > updating the DB then you should. >> > >> > We now have 90 million blog entries in the DB (in 4 months) and have >> > prospected over a billion urls so we are doing something right I hope. >> > >> > The originating question seem to have got out of scope haha. >> > >> > Cheers lads >> > >> > //Marcus >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > On Thu, Jul 2, 2009 at 5:08 PM, Uri Shani <[email protected]> wrote: >> > >> > > Hi Marcus, >> > > If you need a database to serve as an in-between to your customer, >> than >> > > you can do massive load to DB of the end results - as I suggested. >> > > You can avoid that and work with files only. Than you need to serve >> your >> > > customers via an API to your hdfs files. Each query will start a M/R >> job >> > > to get the results. >> > > Don't know the response time - yet it is a viable approach to search. >> > > There are also the hadoop DBs: Hbase and Hive. PIG may also have SQL >> like >> > > things in the future, and there is the JAQL project >> http://www.jaql.org/ >> > . >> > > >> > > - Uri >> > > >> > > >> > > >> > > From: >> > > Marcus Herou <[email protected]> >> > > To: >> > > [email protected] >> > > Date: >> > > 02/07/2009 05:50 PM >> > > Subject: >> > > Re: Parallell maps >> > > >> > > >> > > >> > > Hi. >> > > >> > > Yep I recon this. As I said, I have created a sharded DB solution >> where >> > > all >> > > data is currently spread across 50 shards. I totally agree that one >> > should >> > > try to emit data to the outputCollector but when one have the data in >> > > memory >> > > I do not want to throw it away and re-read it into memory again later >> on >> > > after the job is done by chained Hadoop jobs... In our stats system we >> do >> > > exactly this, almost no db all over the complex chain, just reducing >> and >> > > counting like crazy :) >> > > >> > > There is another pro following your example and that is that you risk >> > less >> > > during the actual job since you do not have to create a db-pool of >> > > thousands >> > > of connections. And the jobs do not affect the live production DB = >> good. >> > > >> > > Anyway what do you mean that if you HAVE to load it into a db ? How >> the >> > > heck >> > > are my users gonna access our search engine without accessing a >> populated >> > > index or DB ? >> > > I've been thinking about this in many use-cases and perhaps I am >> thinking >> > > totally wrong but I tend to not be able to implement a shared-nothing >> > arch >> > > all over the chain it is simply not possible. Sometimes you need a DB, >> > > sometimes you need an index, sometimes a distributed cache, sometimes >> a >> > > file >> > > on local FS/NFS/glusterFS (yes I prefer classical mount points over >> HDFS >> > > due >> > > to the non wrapper characteristict of HDFS and it's speed, random >> access >> > > IO >> > > that is). >> > > >> > > I will put a mental note to adapt and be more hadoopish :) >> > > >> > > Cheers >> > > >> > > //Marcus >> > > >> > > On Thu, Jul 2, 2009 at 4:27 PM, Uri Shani <[email protected]> wrote: >> > > >> > > > Hi, >> > > > Whenever you try to access DB in parallel you need to design it for >> > > that. >> > > > This means, for instance, that you ensure that each of the parallel >> > > tasks >> > > > inserts to a distinct "partition" or table in the database to avoid >> the >> > > > conflicts and failures. Hadoop does the same in its reduce tasks - >> each >> > > > reduce gets ALL the records that are needed to do its function. So >> > there >> > > > is a hash mapping keys to these tasks. Same principle you need to >> > follow >> > > > when linking DB partitions to your hadoop tasks. >> > > > >> > > > In general, think how to not use DB inserts from Hadoop. Rather, >> create >> > > > your results in files. >> > > > At the end of the process, you can - if this is what you HAVE to do >> - >> > > > massively load all the records into the database using efficient >> > loading >> > > > utilities. >> > > > >> > > > If you need the DB to communicate among your tasks, meaning that you >> > > need >> > > > the inserts to be readily available for other threads to select, >> than >> > it >> > > > is obviously the wrong media for such sharing and you need to look >> at >> > > > other solutions to share consistent data among hadoop tasks. For >> > > instance, >> > > > zookeeper, etc. >> > > > >> > > > Regards, >> > > > - Uri >> > > > >> > > > >> > > > >> > > > From: >> > > > Marcus Herou <[email protected]> >> > > > To: >> > > > common-user <[email protected]> >> > > > Date: >> > > > 02/07/2009 12:13 PM >> > > > Subject: >> > > > Parallell maps >> > > > >> > > > >> > > > >> > > > Hi. >> > > > >> > > > I've noticed that hadoop spawns parallell copies of the same task on >> > > > different hosts. I've understood that this is due to improve the >> > > > performance >> > > > of the job by prioritizing fast running tasks. However since we in >> our >> > > > jobs >> > > > connect to databases this leads to conflicts when inserting, >> updating, >> > > > deleting data (duplicated key etc). Yes I know I should consider >> Hadoop >> > > as >> > > > a >> > > > "Shared Nothing" architecture but I really must connect to databases >> in >> > > > the >> > > > jobs. I've created a sharded DB solution which scales as well or I >> > would >> > > > be >> > > > doomed... >> > > > >> > > > Any hints of how to disable this feature or howto reduce the impact >> of >> > > it >> > > > ? >> > > > >> > > > Cheers >> > > > >> > > > /Marcus >> > > > >> > > > -- >> > > > Marcus Herou CTO and co-founder Tailsweep AB >> > > > +46702561312 >> > > > [email protected] >> > > > http://www.tailsweep.com/ >> > > > >> > > > >> > > > >> > > >> > > >> > > -- >> > > Marcus Herou CTO and co-founder Tailsweep AB >> > > +46702561312 >> > > [email protected] >> > > http://www.tailsweep.com/ >> > > >> > > >> > > >> > >> > >> > -- >> > Marcus Herou CTO and co-founder Tailsweep AB >> > +46702561312 >> > [email protected] >> > http://www.tailsweep.com/ >> > >> > > > > -- > Marcus Herou CTO and co-founder Tailsweep AB > +46702561312 > [email protected] > http://www.tailsweep.com/ > > -- Marcus Herou CTO and co-founder Tailsweep AB +46702561312 [email protected] http://www.tailsweep.com/
