I'm wondering if it's possible to get a sort-merge bucketized join to work
when outering to a partitioned table.
My goal is to be able to do an outer join from a table with 10 million rows
to one with 8 billion rows.
The specific problem I'm having now is that while Hive generates an SMJ
query plan when outering to a table with one (bucketized) partition,
it doesn't seem possible to get an SMJ query plan if the
large/outer/streamed table has a second partition.

Is this a limitation of the feature or is it possible to achieve an SMJ to a
partitioned table?



DETAILS


--
-- Environment
--

Hadoop 0.20.2
Hive derived from svn ~8/20/10


--
-- Useful links
--

-- sorted merge join (last updated 3/4/10)
https://issues.apache.org/jira/browse/HIVE-1194

--  join operators
https://issues.apache.org/jira/browse/HIVE-741


--
-- Steps
--

The following steps reproduce the issue

--
-- Generate test data
--

hadoop fs -put /hadoop/conf /test_data_raw
/hadoop/bin/hadoop jar /hadoop/hadoop-*-examples.jar grep /test_data_raw
/test_data_freq '\w+'
-- Cleanup/review data
hadoop fs -rmr /test_data_freq/_logs
hadoop fs -cat /test_data_freq/*

--
-- Create an external table to the source data
--

CREATE EXTERNAL TABLE v_new_data
( cnt STRING,
  word STRING
)
COMMENT 'Use this to pull data easily into Hive (managed) tables'
ROW FORMAT
  DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/test_data_freq';

select * from v_new_data;

--
-- Minor side issue: CLUSTER BY does not have the same effect on the table
description on CLUSTER BY and SORT BY.
--

This says: "Cluster By is a short-cut for both Distribute By and Sort By"
http://wiki.apache.org/hadoop/Hive/LanguageManual/SortBy

However, the following shows different table description when CLUSTER BY is
used without SORT BY.
The differences are include "sortCols" and "SORTBUCKETCOLSPREFIX", which
seem very relant for SMJ, so SORT BY will be used.


set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;

set
hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

DROP TABLE d_test;
CREATE TABLE d_test
( cnt STRING,
  word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;

DROP TABLE d_test2;
CREATE TABLE d_test2
( cnt STRING,
  word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) INTO 4 BUCKETS;


Detailed Table Information Table(tableName:d_test, dbName:default,
owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
FieldSchema(name:word, type:string, comment:null)],
location:hdfs://pos01n:54310/user/hive/warehouse/d_test,
inputFormat:org.apache.hadoop.mapred.TextInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
parameters:{serialization.format=1}), bucketCols:[word],
sortCols:[Order(col:word, order:1)], parameters:{}),
partitionKeys:[FieldSchema(name:job, type:string, comment:null)],
parameters:{SORTBUCKETCOLSPREFIX=TRUE, transient_lastDdlTime=1283277354},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
Time taken: 0.055 seconds
hive> describe extended d_test2;
OK
cnt string
word string
job string

Detailed Table Information Table(tableName:d_test2, dbName:default,
owner:root, createTime:1283277354, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:cnt, type:string, comment:null),
FieldSchema(name:word, type:string, comment:null)],
location:hdfs://pos01n:54310/user/hive/warehouse/d_test2,
inputFormat:org.apache.hadoop.mapred.TextInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat,
compressed:false, numBuckets:4, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
parameters:{serialization.format=1}), bucketCols:[word], sortCols:[],
parameters:{}), partitionKeys:[FieldSchema(name:job, type:string,
comment:null)], parameters:{transient_lastDdlTime=1283277354},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
Time taken: 0.051 seconds

DROP TABLE d_test2;

-- 
-- Populate the RIGHT/OUTER TABLE
--

FROM v_new_data
INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache_file')
SELECT
 cnt,
 word
WHERE
  word != 'apache'
  and word != 'file'


-- 
-- Create the LEFT/INNER table
--

CREATE TABLE j_test
( cnt STRING,
  word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;

FROM v_new_data
INSERT OVERWRITE TABLE j_test PARTITION(job='from')
SELECT
 cnt,
 word
WHERE
  word = 'apache'
  or word = 'file'
  or word = 'name'
  or word = 'property'
;


-- 
-- Create a table to hold the results
--
DROP TABLE r_test;

CREATE TABLE r_test
(
  cnt STRING,
  word STRING
)
PARTITIONED BY(job STRING)
CLUSTERED BY(word) SORTED BY(word) INTO 4 BUCKETS;

--
-- Perform the sort-merge bucketized outer join
--

-- GOOD: This shows "Sorted Merge Bucket Map Join Operator"
EXPLAIN
INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );

-- This works
SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );
--
file NULL
property 134
apache NULL
name 136


-- 
-- Add a partition
--

FROM v_new_data
INSERT OVERWRITE TABLE d_test PARTITION(job='no_apache')
SELECT
 cnt,
 word
WHERE
  word != 'apache'


-- BAD: This does not show "Sorted Merge Bucket Map Join Operator", only
"Common Join Operator"
EXPLAIN
INSERT OVERWRITE TABLE r_test PARTITION(job='result1')
SELECT /*+ MAPJOIN(b) */ a.word, b.cnt
FROM j_test a LEFT OUTER JOIN d_test b ON ( a.word = b.word );

NOTE: The reduce-side join will succeed but only with the small data set.
The bad (reduce-side) join, combined with a larger data set (e.g. 45M rows,
4GB of data, in a table with 2 bucketized partitions) NEVER even complets a
single mapper.

Of course, I'd appreciate any help that can be provided.
I'm impressed with Hive so far and hope to use it replace an RDBMS for
"real" ETL, but that's only possible if I can get this sort of operation
(outering from 10M rows to 10B rows) to work.

Thanks for the work that's been done to Hive so far and for any help that
anyone can offer me in this situation.

Reply via email to