I'm wondering if it's possible to get a sort-merge bucketized join to work when outering to a partitioned table. My goal is to be able to do an outer join from a table with 10 million rows to one with 8 billion rows. The specific problem I'm having now is that while Hive generates an SMJ query plan when outering to a table with one (bucketized) partition, it doesn't seem possible to get an SMJ query plan if the large/outer/streamed table has a second partition.
Is this a limitation of the feature or is it possible to achieve an SMJ to a partitioned table? DETAILS -- -- Environment -- Hadoop 0.20.2 Hive derived from svn ~8/20/10 -- -- Useful links -- -- sorted merge join (last updated 3/4/10) https://issues.apache.org/jira/browse/HIVE-1194 -- join operators https://issues.apache.org/jira/browse/HIVE-741 -- -- Steps -- The following steps reproduce the issue -- -- Generate test data -- hadoop fs -put /hadoop/conf /test_data_raw /hadoop/bin/hadoop jar /hadoop/hadoop-*-examples.jar grep /test_data_raw /test_data_freq '\w+' -- Cleanup/review data hadoop fs -rmr /test_data_freq/_logs hadoop fs -cat /test_data_freq/* -- -- Create an external table to the source data -- CREATE EXTERNAL TABLE v_new_data ( cnt STRING, word STRING ) COMMENT 'Use this to pull data easily into Hive (managed) tables' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/test_data_freq'; select * from v_new_data; -- -- Minor side issue: CLUSTER BY does not have the same effect on the table description on CLUSTER BY and SORT BY. -- This says: "Cluster By is a short-cut for both Distribute By and Sort By" http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy However, the following shows different table description when CLUSTER BY is used without SORT BY. The differences are include "sortCols" and "SORTBUCKETCOLSPREFIX", which seem very relant for SMJ, so SORT BY will be used. set hive.enforce.bucketing=true; set hive.enforce.sorting=true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin.sortedmerge = true; DROP TABLE d_test; CREATE TABLE d_test ( cnt STRING, word STRING ) PARTITIONED BY(job STRING) CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS; DROP TABLE d_test2; CREATE TABLE d_test2 ( cnt STRING, word STRING ) PARTITIONED BY(job STRING) CLUSTERED BY(word) INTO 4 BUCKETS; Detailed Table Information Table(tableName:d_test, dbName:default, owner:root, createTime:1283277354, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null), FieldSchema(name:word, type:string, comment:null)], location:hdfs://pos01n:54310/user/hive/warehouse/d_test, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[word], sortCols:[Order(col:word, order:1)], parameters:{}), partitionKeys:[FieldSchema(name:job, type:string, comment:null)], parameters:{SORTBUCKETCOLSPREFIX=TRUE, transient_lastDdlTime=1283277354}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) Time taken: 0.055 seconds hive> describe extended d_test2; OK cnt string word string job string Detailed Table Information Table(tableName:d_test2, dbName:default, owner:root, createTime:1283277354, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null), FieldSchema(name:word, type:string, comment:null)], location:hdfs://pos01n:54310/user/hive/warehouse/d_test2, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[word], sortCols:[], parameters:{}), partitionKeys:[FieldSchema(name:job, type:string, comment:null)], parameters:{transient_lastDdlTime=1283277354}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) Time taken: 0.051 seconds DROP TABLE d_test2; -- -- Populate the RIGHT/OUTER TABLE -- FROM v_new_data INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache_file') SELECT cnt, word WHERE word != 'apache' and word != 'file' -- -- Create the LEFT/INNER table -- CREATE TABLE j_test ( cnt STRING, word STRING ) PARTITIONED BY(job STRING) CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS; FROM v_new_data INSERT OVERWRITE TABLE j_test PARTITION(job='from') SELECT cnt, word WHERE word = 'apache' or word = 'file' or word = 'name' or word = 'property' ; -- -- Create a table to hold the results -- DROP TABLE r_test; CREATE TABLE r_test ( cnt STRING, word STRING ) PARTITIONED BY(job STRING) CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS; -- -- Perform the sort-merge bucketized outer join -- -- GOOD: This shows "Sorted Merge Bucket Map Join Operator" EXPLAIN INSERT OVERWRITE TABLE r_test PARTITION(job='result1') SELECT /*+ MAPJOIN(b) */ a.word, b.cnt FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word ); -- This works SELECT /*+ MAPJOIN(b) */ a.word, b.cnt FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word ); -- file NULL property 134 apache NULL name 136 -- -- Add a partition -- FROM v_new_data INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache') SELECT cnt, word WHERE word != 'apache' -- BAD: This does not show "Sorted Merge Bucket Map Join Operator", only "Common Join Operator" EXPLAIN INSERT OVERWRITE TABLE r_test PARTITION(job='result1') SELECT /*+ MAPJOIN(b) */ a.word, b.cnt FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word ); NOTE: The reduce-side join will succeed but only with the small data set. The bad (reduce-side) join, combined with a larger data set (e.g. 45M rows, 4GB of data, in a table with 2 bucketized partitions) NEVER even complets a single mapper. Of course, I'd appreciate any help that can be provided. I'm impressed with Hive so far and hope to use it replace an RDBMS for "real" ETL, but that's only possible if I can get this sort of operation (outering from 10M rows to 10B rows) to work. Thanks for the work that's been done to Hive so far and for any help that anyone can offer me in this situation.