[ https://issues.apache.org/jira/browse/HIVE-917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12830012#action_12830012 ]
Namit Jain commented on HIVE-917: --------------------------------- BucketMapJoinOptimizer.java: 80 wrong comment: // process group-by pattern can you add a correct comment ? // mapper. That means there is not reducer between the root table scan and change to: // mapper. That means there is no reducer between the root table scan and Add some comments in private boolean checkBucketColumns(List<String> bucketColumns, MapJoinDesc mjDesc, int index) { A mapjoin B where A is the big table and partitioned should be optimized B is not partitioned (assuming both A and B are bucketed) if(partNumber == 0) { Integer num = new Integer(0); bucketNumbers.add(num); aliasToBucketNumber.put(alias, num); aliasToBucketFileNames.put(alias, new ArrayList<String>()); no need to do this - anyway, the results are empty ExecMapper: if(bucketMatcherCls == null) { bucketMatcherCls = org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class; } Add the class name in mapredlocalwork and initialize it using reflection Keep file name to file name mapping in mapredlocalwork (only useful for bucketed map join - not for skew join) MapredLocalWork: private LinkedHashMap<String, Integer> aliasToBucketNumber; private LinkedHashMap<String, List<String>> aliasToBucketFileNames; private String mapJoinBigTableAlias; private Class<? extends BucketMatcher> bucketMatcker; create a new class for the above public Class<? extends BucketMatcher> getBucketMatcker() { return bucketMatcker; } public void setBucketMatcker(Class<? extends BucketMatcher> bucketMatcker) { this.bucketMatcker = bucketMatcker; } spelling: should be Matcher DefaultBucketMatcher: public List<Path> getAliasBucketFiles(String refTableInputFile, String refTableAlias, String alias) { int bigTblBucketNum = aliasToBucketNumber.get(refTableAlias); int smallTblBucketNum = aliasToBucketNumber.get(alias); Collections.sort(aliasToBucketFileNames.get(refTableAlias)); Collections.sort(aliasToBucketFileNames.get(alias)); List<Path> resultFileNames = new ArrayList<Path>(); if (bigTblBucketNum >= smallTblBucketNum) { int temp = bigTblBucketNum / smallTblBucketNum; int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile); int toAddSmallIndex = index/temp; if(toAddSmallIndex < aliasToBucketFileNames.get(alias).size()) { resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(toAddSmallIndex))); } } else { int jump = smallTblBucketNum / bigTblBucketNum; int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile); for (int i = index; i < aliasToBucketFileNames.get(alias).size(); i = i + jump) { if(i <= aliasToBucketFileNames.get(alias).size()) { resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(i))); } } } return resultFileNames; } move this to compile time and add some more comments FetchOperator.java: 6 boolean ret = false; 267 try { 268 value = currRecReader.createValue(); 269 ret = currRecReader.next(key, value); 270 } catch (Exception e) { 271 e.printStackTrace(); 272 } > Bucketed Map Join > ----------------- > > Key: HIVE-917 > URL: https://issues.apache.org/jira/browse/HIVE-917 > Project: Hadoop Hive > Issue Type: New Feature > Reporter: Zheng Shao > Attachments: hive-917-2010-2-3.patch > > > Hive already have support for map-join. Map-join treats the big table as job > input, and in each mapper, it loads all data from a small table. > In case the big table is already bucketed on the join key, we don't have to > load the whole small table in each of the mappers. This will greatly > alleviate the memory pressure, and make map-join work with medium-sized > tables. > There are 4 steps we can improve: > S0. This is what the user can already do now: create a new bucketed table and > insert all data from the small table to it; Submit BUCKETNUM jobs, each doing > a map-side join of "bigtable TABLEPARTITION(BUCKET i OUT OF NBUCKETS)" with > "smallbucketedtable TABLEPARTITION(BUCKET i OUT OF NBUCKETS)". > S1. Change the code so that when map-join is loading the small table, we > automatically drop the rows with the keys that are NOT in the same bucket as > the big table. This should alleviate the problem on memory, but we might > still have thousands of mappers reading the whole of the small table. > S2. Let's say the user already bucketed the small table on the join key into > exactly the same number of buckets (or a factor of the buckets of the big > table), then map-join can choose to load only the buckets that are useful. > S3. Add a new hint (e.g. /*+ MAPBUCKETJOIN(a) */), so that Hive automatically > does S2, without the need of asking the user to create temporary bucketed > table for the small table. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.