[
https://issues.apache.org/jira/browse/CARBONDATA-1076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jacky Li resolved CARBONDATA-1076.
----------------------------------
Resolution: Fixed
Fix Version/s: 1.2.0
> Join Issue caused by dictionary and shuffle exchange
> ----------------------------------------------------
>
> Key: CARBONDATA-1076
> URL: https://issues.apache.org/jira/browse/CARBONDATA-1076
> Project: CarbonData
> Issue Type: Bug
> Components: core
> Affects Versions: 0.1.1-incubating, 1.1.0
> Environment: Carbon + spark 2.1
> Reporter: chenerlu
> Assignee: Ravindra Pesala
> Fix For: 1.2.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> We can reproduce this issue as following steps:
> Step1: create a carbon table
>
> carbon.sql("CREATE TABLE IF NOT EXISTS carbon_table (col1 int, col2 int, col3
> int) STORED by 'carbondata'
> TBLPROPERTIES('DICTIONARY_INCLUDE'='col1,col2,col3','TABLE_BLOCKSIZE'='4')")
>
> Step2: load data
> carbon.sql("LOAD DATA LOCAL INPATH '/opt/carbon_table' INTO TABLE
> carbon_table")
> data in file carbon_table as follows:
> col1,col2,col3
> 1,2,3
> 4,5,6
> 7,8,9
>
> Step3: do the query
> carbon.sql("SELECT c1.col1,c2.col1,c2.col3 FROM (SELECT col1,col2 FROM
> carbon_table GROUP BY col1,col2) c1 FULL JOIN (SELECT col1,count(col2) as
> col3 FROM carbon_table GROUP BY col1) c2 ON c1.col1 = c2.col1").show()
> [expected] Hive table and parquet table get same result as below and it
> should be correct.
> |col1|col1|col3|
> | 1| 1| 1|
> | 4| 4| 1|
> | 7| 7| 1|
> [acutally] carbon will get null because wrong match.
> |col1|col1|col3|
> | 1|null|null|
> |null| 4| 1|
> | 4|null|null|
> |null| 7| 1|
> | 7|null|null|
> |null| 1| 1|
> Root cause analysis:
>
> It is because this query has two subquery, and one subquey do the decode
> after exchange and the other subquery do the decode before exchange, and this
> may lead to wrong match when execute full join.
>
> My idea: Can we move decode before exchange ? Because I am not very familiar
> with Carbon query, so any idea about this ?
> Plan as follows:
>
> == Physical Plan ==
> SortMergeJoin [col1#3445], [col1#3460], FullOuter
> :- Sort [col1#3445 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(col1#3445, 200)
> : +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 ->
> col1#3445, col2#3446 -> col2#3446, col3#3447 ->
> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table
> name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]),
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461,
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name
> :tempdev, Table name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])],
> IncludeProfile(ArrayBuffer(col1#3445)), CarbonAliasDecoderRelation(),
> org.apache.spark.sql.CarbonSession@69e87cbe
> : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[],
> output=[col1#3445])
> : +- Exchange hashpartitioning(col1#3445, col2#3446, 200)
> : +- HashAggregate(keys=[col1#3445, col2#3446], functions=[],
> output=[col1#3445, col2#3446])
> : +- Scan CarbonDatasourceHadoopRelation [ Database name
> :tempdev, Table name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]
> tempdev.carbon_table[col1#3445,col2#3446]
> +- Sort [col1#3460 ASC NULLS FIRST], false, 0
> +- CarbonDictionaryDecoder [CarbonDecoderRelation(Map(col1#3445 ->
> col1#3445, col2#3446 -> col2#3446, col3#3447 ->
> col3#3447),CarbonDatasourceHadoopRelation [ Database name :tempdev, Table
> name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]),
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461,
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name
> :tempdev, Table name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])],
> IncludeProfile(ArrayBuffer(col1#3460)), CarbonAliasDecoderRelation(),
> org.apache.spark.sql.CarbonSession@69e87cbe
> +- HashAggregate(keys=[col1#3460], functions=[count(col2#3461)],
> output=[col1#3460, col3#3436L])
> +- Exchange hashpartitioning(col1#3460, 200)
> +- HashAggregate(keys=[col1#3460],
> functions=[partial_count(col2#3461)], output=[col1#3460, count#3472L])
> +- CarbonDictionaryDecoder
> [CarbonDecoderRelation(Map(col1#3445 -> col1#3445, col2#3446 -> col2#3446,
> col3#3447 -> col3#3447),CarbonDatasourceHadoopRelation [ Database name
> :tempdev, Table name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]),
> CarbonDecoderRelation(Map(col1#3460 -> col1#3460, col2#3461 -> col2#3461,
> col3#3462 -> col3#3462),CarbonDatasourceHadoopRelation [ Database name
> :tempdev, Table name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ])],
> IncludeProfile(ArrayBuffer(col2#3461)), CarbonAliasDecoderRelation(),
> org.apache.spark.sql.CarbonSession@69e87cbe
> +- Scan CarbonDatasourceHadoopRelation [ Database name
> :tempdev, Table name :carbon_table, Schema
> :Some(StructType(StructField(col1,IntegerType,true),
> StructField(col2,IntegerType,true), StructField(col3,IntegerType,true))) ]
> tempdev.carbon_table[col1#3460,col2#3461]]
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)