You are right, hive's SMJ does not support joining across multiple partitions.
For your case, is it possible to use other solutions while still using
smj? Like creating external tables/union join results.

On Tue, Aug 31, 2010 at 8:53 PM, phil young <phil.wills.yo...@gmail.com> wrote:
> I'm wondering if it's possible to get a sort-merge bucketized join to work
> when outering to a partitioned table.
> My goal is to be able to do an outer join from a table with 10 million rows
> to one with 8 billion rows.
> The specific problem I'm having now is that while Hive generates an SMJ
> query plan when outering to a table with one (bucketized) partition,
> it doesn't seem possible to get an SMJ query plan if the
> large/outer/streamed table has a second partition.
> Is this a limitation of the feature or is it possible to achieve an SMJ to a
> partitioned table?
>
>
> DETAILS
>
> --
> -- Environment
> --
> Hadoop 0.20.2
> Hive derived from svn ~8/20/10
>
> --
> -- Useful links
> --
> -- sorted merge join (last updated 3/4/10)
> https://issues.apache.org/jira/browse/HIVE-1194
> --  join operators
> https://issues.apache.org/jira/browse/HIVE-741
>
> --
> -- Steps
> --
> The following steps reproduce the issue
> --
> -- Generate test data
> --
> hadoop fs -put /hadoop/conf /test_data_raw
> /hadoop/bin/hadoop jar /hadoop/hadoop-*-examples.jar grep /test_data_raw
> /test_data_freq '\w+'
> -- Cleanup/review data
> hadoop fs -rmr /test_data_freq/_logs
> hadoop fs -cat /test_data_freq/*
> --
> -- Create an external table to the source data
> --
> CREATE EXTERNAL TABLE v_new_data
> ( cnt STRING,
>   word STRING
> )
> COMMENT 'Use this to pull data easily into Hive (managed) tables'
> ROW FORMAT
>   DELIMITED FIELDS TERMINATED BY '\t'
> LOCATION '/test_data_freq';
> select * from v_new_data;
> --
> -- Minor side issue: CLUSTER BY does not have the same effect on the table
> description on CLUSTER BY and SORT BY.
> --
> This says: "Cluster By is a short-cut for both Distribute By and Sort By"
> http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy
> However, the following shows different table description when CLUSTER BY is
> used without SORT BY.
> The differences are include "sortCols" and "SORTBUCKETCOLSPREFIX", which
> seem very relant for SMJ, so SORT BY will be used.
>
> set hive.enforce.bucketing=true;
> set hive.enforce.sorting=true;
> set
> hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
> set hive.optimize.bucketmapjoin = true;
> set hive.optimize.bucketmapjoin.sortedmerge = true;
> DROP TABLE d_test;
> CREATE TABLE d_test
> ( cnt STRING,
>   word STRING
> )
> PARTITIONED BY(job STRING)
> CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
> DROP TABLE d_test2;
> CREATE TABLE d_test2
> ( cnt STRING,
>   word STRING
> )
> PARTITIONED BY(job STRING)
> CLUSTERED BY(word) INTO 4 BUCKETS;
>
> Detailed Table Information Table(tableName:d_test, dbName:default,
> owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
> sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
> FieldSchema(name:word, type:string, comment:null)],
> location:hdfs://pos01n:54310/user/hive/warehouse/d_test,
> inputFormat:org.apache.hadoop.mapred.TextInputFormat,
> outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
> compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
> serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
> parameters:{serialization.format=1}), bucketCols:[word],
> sortCols:[Order(col:word, order:1)], parameters:{}),
> partitionKeys:[FieldSchema(name:job, type:string, comment:null)],
> parameters:{SORTBUCKETCOLSPREFIX=TRUE, transient_lastDdlTime=1283277354},
> viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
> Time taken: 0.055 seconds
> hive> describe extended d_test2;
> OK
> cnt string
> word string
> job string
>
> Detailed Table Information Table(tableName:d_test2, dbName:default,
> owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
> sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
> FieldSchema(name:word, type:string, comment:null)],
> location:hdfs://pos01n:54310/user/hive/warehouse/d_test2,
> inputFormat:org.apache.hadoop.mapred.TextInputFormat,
> outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
> compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
> serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
> parameters:{serialization.format=1}), bucketCols:[word], sortCols:[],
> parameters:{}), partitionKeys:[FieldSchema(name:job, type:string,
> comment:null)], parameters:{transient_lastDdlTime=1283277354},
> viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
> Time taken: 0.051 seconds
> DROP TABLE d_test2;
> --
> -- Populate the RIGHT/OUTER TABLE
> --
> FROM v_new_data
> INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache_file')
> SELECT
>  cnt,
>  word
> WHERE
>   word != 'apache'
>   and word != 'file'
>
> --
> -- Create the LEFT/INNER table
> --
> CREATE TABLE j_test
> ( cnt STRING,
>   word STRING
> )
> PARTITIONED BY(job STRING)
> CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
> FROM v_new_data
> INSERT OVERWRITE TABLE j_test PARTITION(job='from')
> SELECT
>  cnt,
>  word
> WHERE
>   word = 'apache'
>   or word = 'file'
>   or word = 'name'
>   or word = 'property'
> ;
>
> --
> -- Create a table to hold the results
> --
> DROP TABLE r_test;
> CREATE TABLE r_test
> (
>   cnt STRING,
>   word STRING
> )
> PARTITIONED BY(job STRING)
> CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;
> --
> -- Perform the sort-merge bucketized outer join
> --
> -- GOOD: This shows "Sorted Merge Bucket Map Join Operator"
> EXPLAIN
> INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
> SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
> FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
> -- This works
> SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
> FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
> --
> file NULL
> property 134
> apache NULL
> name 136
>
> --
> -- Add a partition
> --
> FROM v_new_data
> INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache')
> SELECT
>  cnt,
>  word
> WHERE
>   word != 'apache'
>
> -- BAD: This does not show "Sorted Merge Bucket Map Join Operator", only
> "Common Join Operator"
> EXPLAIN
> INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
> SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
> FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
> NOTE: The reduce-side join will succeed but only with the small data set.
> The bad (reduce-side) join, combined with a larger data set (e.g. 45M rows,
> 4GB of data, in a table with 2 bucketized partitions) NEVER even complets a
> single mapper.
> Of course, I'd appreciate any help that can be provided.
> I'm impressed with Hive so far and hope to use it replace an RDBMS for
> "real" ETL, but that's only possible if I can get this sort of operation
> (outering from 10M rows to 10B rows) to work.
> Thanks for the work that's been done to Hive so far and for any help that
> anyone can offer me in this situation.
>
>
>

Reply via email to