[ 
https://issues.apache.org/jira/browse/CARBONDATA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacky Li resolved CARBONDATA-1076.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.0

> Join Issue caused by dictionary and shuffle exchange
> ----------------------------------------------------
>
>                 Key: CARBONDATA-1076
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-1076
>             Project: CarbonData
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.1.1-incubating, 1.1.0
>         Environment: Carbon + spark 2.1
>            Reporter: chenerlu
>            Assignee: Ravindra Pesala
>             Fix For: 1.2.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> We can reproduce this issue as following steps:
> Step1: create a carbon table
>  
> carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3 
> int) STORED by 'carbondata' 
> TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')")
>  
> Step2: load data
> carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE 
> carbon_table")
> data in file carbon_table as follows:
> col1,col2,col3
> 1,2,3
> 4,5,6
> 7,8,9
>  
> Step3: do the query
> carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM 
> carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as 
> col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show()
> [expected] Hive table and parquet table get same result as below and it 
> should be correct.
> |col1|col1|col3|
> |   1|   1|   1|
> |   4|   4|   1|
> |   7|   7|   1|
> [acutally] carbon will get null because wrong match.
> |col1|col1|col3|
> |   1|null|null|
> |null|   4|   1|
> |   4|null|null|
> |null|   7|   1|
> |   7|null|null|
> |null|   1|   1|
> Root cause analysis:
>  
> It is because this query has two subquery, and one subquey do the decode 
> after exchange and the other subquery do the decode before exchange, and this 
> may lead to wrong match when execute full join.
>  
> My idea: Can we move decode before exchange ? Because I am not very familiar 
> with Carbon query, so any idea about this ?
> Plan as follows:
>  
> == Physical Plan ==
> SortMergeJoin [col1#3445], [col1#3460], FullOuter
> :- Sort [col1#3445 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(col1#3445, 200)
> :     +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
> col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table 
> name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
> IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(), 
> org.apache.spark.sql.CarbonSession@69e87cbe
> :        +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
> output=[col1#3445])
> :           +- Exchange hashpartitioning(col1#3445, col2#3446, 200)
> :              +- HashAggregate(keys=[col1#3445, col2#3446], functions=[], 
> output=[col1#3445, col2#3446])
> :                 +- Scan CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
> tempdev.carbon_table[col1#3445,col2#3446] 
> +- Sort [col1#3460 ASC NULLS FIRST], false, 0
>    +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 -> 
> col1#3445, col2#3446 -> col2#3446, col3#3447 -> 
> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table 
> name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
> IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(), 
> org.apache.spark.sql.CarbonSession@69e87cbe
>       +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)], 
> output=[col1#3460, col3#3436L])
>          +- Exchange hashpartitioning(col1#3460, 200)
>             +- HashAggregate(keys=[col1#3460], 
> functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L])
>                +- CarbonDictionaryDecoder 
> [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446, 
> col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]), 
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461, 
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])], 
> IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(), 
> org.apache.spark.sql.CarbonSession@69e87cbe
>                   +- Scan CarbonDatasourceHadoopRelation [ Database name 
> :tempdev, Table name :carbon_table, Schema 
> :Some(StructType(StructField(col1,IntegerType,true), 
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ] 
> tempdev.carbon_table[col1#3460,col2#3461]]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to