[ 
https://issues.apache.org/jira/browse/HIVE-917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12830012#action_12830012
 ] 

Namit Jain commented on HIVE-917:
---------------------------------

BucketMapJoinOptimizer.java: 80
wrong comment:
    // process group-by pattern

can you add a correct comment ?



    // mapper. That means there is not reducer between the root table scan and

change to:

    // mapper. That means there is no reducer between the root table scan and



Add some comments in
    private boolean checkBucketColumns(List<String> bucketColumns, MapJoinDesc 
mjDesc, int index) {


A mapjoin B where A is the big table and partitioned should be optimized
B is not partitioned
(assuming both A and B are bucketed)


          if(partNumber == 0) {
            Integer num = new Integer(0);
            bucketNumbers.add(num);
            aliasToBucketNumber.put(alias, num);
            aliasToBucketFileNames.put(alias, new ArrayList<String>());



no need to do this - anyway, the results are empty


ExecMapper:

    if(bucketMatcherCls == null) {
      bucketMatcherCls = 
org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class;
    }


Add the class name in mapredlocalwork and initialize it using reflection
Keep file name to file name mapping in mapredlocalwork (only useful for 
bucketed map join - not for skew join)


MapredLocalWork:


  private LinkedHashMap<String, Integer> aliasToBucketNumber;
  private LinkedHashMap<String, List<String>> aliasToBucketFileNames;
  private String mapJoinBigTableAlias;
  private Class<? extends BucketMatcher> bucketMatcker;


create a new class for the above


  public Class<? extends BucketMatcher> getBucketMatcker() {
    return bucketMatcker;
  }

  public void setBucketMatcker(Class<? extends BucketMatcher> bucketMatcker) {
    this.bucketMatcker = bucketMatcker;
  }

spelling: should be Matcher


DefaultBucketMatcher:

  public List<Path> getAliasBucketFiles(String refTableInputFile, String 
refTableAlias, String alias) {
    int bigTblBucketNum =  aliasToBucketNumber.get(refTableAlias);
    int smallTblBucketNum = aliasToBucketNumber.get(alias);
    Collections.sort(aliasToBucketFileNames.get(refTableAlias));
    Collections.sort(aliasToBucketFileNames.get(alias));

    List<Path> resultFileNames = new ArrayList<Path>();
    if (bigTblBucketNum >= smallTblBucketNum) {
      int temp = bigTblBucketNum / smallTblBucketNum;
      int index = 
aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile);
      int toAddSmallIndex = index/temp;
      if(toAddSmallIndex < aliasToBucketFileNames.get(alias).size()) {
        resultFileNames.add(new 
Path(aliasToBucketFileNames.get(alias).get(toAddSmallIndex)));
      }
    } else {
      int jump = smallTblBucketNum / bigTblBucketNum;
      int index = 
aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile);
      for (int i = index; i < aliasToBucketFileNames.get(alias).size(); i = i + 
jump) {
        if(i <= aliasToBucketFileNames.get(alias).size()) {
          resultFileNames.add(new 
Path(aliasToBucketFileNames.get(alias).get(i)));
        }
      }
    }
    return resultFileNames;
  }



move this to compile time and add some more comments



FetchOperator.java:

6                boolean ret = false;
                                267             try {
                                268             value = 
currRecReader.createValue();
                                269             ret = currRecReader.next(key, 
value);
                                270             } catch (Exception e) {
                                271             e.printStackTrace();
                                272             }

> Bucketed Map Join
> -----------------
>
>                 Key: HIVE-917
>                 URL: https://issues.apache.org/jira/browse/HIVE-917
>             Project: Hadoop Hive
>          Issue Type: New Feature
>            Reporter: Zheng Shao
>         Attachments: hive-917-2010-2-3.patch
>
>
> Hive already have support for map-join. Map-join treats the big table as job 
> input, and in each mapper, it loads all data from a small table.
> In case the big table is already bucketed on the join key, we don't have to 
> load the whole small table in each of the mappers. This will greatly 
> alleviate the memory pressure, and make map-join work with medium-sized 
> tables.
> There are 4 steps we can improve:
> S0. This is what the user can already do now: create a new bucketed table and 
> insert all data from the small table to it; Submit BUCKETNUM jobs, each doing 
> a map-side join of "bigtable TABLEPARTITION(BUCKET i OUT OF NBUCKETS)" with 
> "smallbucketedtable TABLEPARTITION(BUCKET i OUT OF NBUCKETS)".
> S1. Change the code so that when map-join is loading the small table, we 
> automatically drop the rows with the keys that are NOT in the same bucket as 
> the big table. This should alleviate the problem on memory, but we might 
> still have thousands of mappers reading the whole of the small table.
> S2. Let's say the user already bucketed the small table on the join key into 
> exactly the same number of buckets (or a factor of the buckets of the big 
> table), then map-join can choose to load only the buckets that are useful.
> S3. Add a new hint (e.g. /*+ MAPBUCKETJOIN(a) */), so that Hive automatically 
> does S2, without the need of asking the user to create temporary bucketed 
> table for the small table.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to