You are right, hive's SMJ does not support joining across multiple partitions. For your case, is it possible to use other solutions while still using smj? Like creating external tables/union join results.
On Tue, Aug 31, 2010 at 8:53 PM, phil young <phil.wills.yo...@gmail.com> wrote: > I'm wondering if it's possible to get a sort-merge bucketized join to work > when outering to a partitioned table. > My goal is to be able to do an outer join from a table with 10 million rows > to one with 8 billion rows. > The specific problem I'm having now is that while Hive generates an SMJ > query plan when outering to a table with one (bucketized) partition, > it doesn't seem possible to get an SMJ query plan if the > large/outer/streamed table has a second partition. > Is this a limitation of the feature or is it possible to achieve an SMJ to a > partitioned table? > > > DETAILS > > -- > -- Environment > -- > Hadoop 0.20.2 > Hive derived from svn ~8/20/10 > > -- > -- Useful links > -- > -- sorted merge join (last updated 3/4/10) > https://issues.apache.org/jira/browse/HIVE-1194 > -- join operators > https://issues.apache.org/jira/browse/HIVE-741 > > -- > -- Steps > -- > The following steps reproduce the issue > -- > -- Generate test data > -- > hadoop fs -put /hadoop/conf /test_data_raw > /hadoop/bin/hadoop jar /hadoop/hadoop-*-examples.jar grep /test_data_raw > /test_data_freq '\w+' > -- Cleanup/review data > hadoop fs -rmr /test_data_freq/_logs > hadoop fs -cat /test_data_freq/* > -- > -- Create an external table to the source data > -- > CREATE EXTERNAL TABLE v_new_data > ( cnt STRING, > word STRING > ) > COMMENT 'Use this to pull data easily into Hive (managed) tables' > ROW FORMAT > DELIMITED FIELDS TERMINATED BY '\t' > LOCATION '/test_data_freq'; > select * from v_new_data; > -- > -- Minor side issue: CLUSTER BY does not have the same effect on the table > description on CLUSTER BY and SORT BY. > -- > This says: "Cluster By is a short-cut for both Distribute By and Sort By" > http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy > However, the following shows different table description when CLUSTER BY is > used without SORT BY. > The differences are include "sortCols" and "SORTBUCKETCOLSPREFIX", which > seem very relant for SMJ, so SORT BY will be used. > > set hive.enforce.bucketing=true; > set hive.enforce.sorting=true; > set > hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; > set hive.optimize.bucketmapjoin = true; > set hive.optimize.bucketmapjoin.sortedmerge = true; > DROP TABLE d_test; > CREATE TABLE d_test > ( cnt STRING, > word STRING > ) > PARTITIONED BY(job STRING) > CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS; > DROP TABLE d_test2; > CREATE TABLE d_test2 > ( cnt STRING, > word STRING > ) > PARTITIONED BY(job STRING) > CLUSTERED BY(word) INTO 4 BUCKETS; > > Detailed Table Information Table(tableName:d_test, dbName:default, > owner:root, createTime:1283277354, lastAccessTime:0, retention:0, > sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null), > FieldSchema(name:word, type:string, comment:null)], > location:hdfs://pos01n:54310/user/hive/warehouse/d_test, > inputFormat:org.apache.hadoop.mapred.TextInputFormat, > outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, > compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null, > serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, > parameters:{serialization.format=1}), bucketCols:[word], > sortCols:[Order(col:word, order:1)], parameters:{}), > partitionKeys:[FieldSchema(name:job, type:string, comment:null)], > parameters:{SORTBUCKETCOLSPREFIX=TRUE, transient_lastDdlTime=1283277354}, > viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) > Time taken: 0.055 seconds > hive> describe extended d_test2; > OK > cnt string > word string > job string > > Detailed Table Information Table(tableName:d_test2, dbName:default, > owner:root, createTime:1283277354, lastAccessTime:0, retention:0, > sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null), > FieldSchema(name:word, type:string, comment:null)], > location:hdfs://pos01n:54310/user/hive/warehouse/d_test2, > inputFormat:org.apache.hadoop.mapred.TextInputFormat, > outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, > compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null, > serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, > parameters:{serialization.format=1}), bucketCols:[word], sortCols:[], > parameters:{}), partitionKeys:[FieldSchema(name:job, type:string, > comment:null)], parameters:{transient_lastDdlTime=1283277354}, > viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) > Time taken: 0.051 seconds > DROP TABLE d_test2; > -- > -- Populate the RIGHT/OUTER TABLE > -- > FROM v_new_data > INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache_file') > SELECT > cnt, > word > WHERE > word != 'apache' > and word != 'file' > > -- > -- Create the LEFT/INNER table > -- > CREATE TABLE j_test > ( cnt STRING, > word STRING > ) > PARTITIONED BY(job STRING) > CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS; > FROM v_new_data > INSERT OVERWRITE TABLE j_test PARTITION(job='from') > SELECT > cnt, > word > WHERE > word = 'apache' > or word = 'file' > or word = 'name' > or word = 'property' > ; > > -- > -- Create a table to hold the results > -- > DROP TABLE r_test; > CREATE TABLE r_test > ( > cnt STRING, > word STRING > ) > PARTITIONED BY(job STRING) > CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS; > -- > -- Perform the sort-merge bucketized outer join > -- > -- GOOD: This shows "Sorted Merge Bucket Map Join Operator" > EXPLAIN > INSERT OVERWRITE TABLE r_test PARTITION(job='result1') > SELECT /*+ MAPJOIN(b) */ a.word, b.cnt > FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word ); > -- This works > SELECT /*+ MAPJOIN(b) */ a.word, b.cnt > FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word ); > -- > file NULL > property 134 > apache NULL > name 136 > > -- > -- Add a partition > -- > FROM v_new_data > INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache') > SELECT > cnt, > word > WHERE > word != 'apache' > > -- BAD: This does not show "Sorted Merge Bucket Map Join Operator", only > "Common Join Operator" > EXPLAIN > INSERT OVERWRITE TABLE r_test PARTITION(job='result1') > SELECT /*+ MAPJOIN(b) */ a.word, b.cnt > FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word ); > NOTE: The reduce-side join will succeed but only with the small data set. > The bad (reduce-side) join, combined with a larger data set (e.g. 45M rows, > 4GB of data, in a table with 2 bucketized partitions) NEVER even complets a > single mapper. > Of course, I'd appreciate any help that can be provided. > I'm impressed with Hive so far and hope to use it replace an RDBMS for > "real" ETL, but that's only possible if I can get this sort of operation > (outering from 10M rows to 10B rows) to work. > Thanks for the work that's been done to Hive so far and for any help that > anyone can offer me in this situation. > > >