[
https://issues.apache.org/jira/browse/CARBONDATA-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sachin Ramachandra Setty updated CARBONDATA-3851:
-------------------------------------------------
Description:
The result sets are different when run the queries in spark-shell--master local
and spark-shell --master yarn (Two Different Spark Deploy Modes)
Steps to Reproduce Issue :
import scala.collection.JavaConverters._
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.mutation.merge.
{CarbonMergeDataSetCommand, DeleteAction, InsertAction,
InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction,
WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.
{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
import spark.implicits._
sql("drop table if exists order").show()
sql("drop table if exists order_hist").show()
sql("create table order_hist(id string, name string, quantity int, price int,
state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
val initframe = sc.parallelize(1 to 10, 4).map
{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}
.toDF("id", "name", "c_name", "quantity", "price", "state")
initframe.write
.format("carbondata")
.option("tableName", "order")
.option("partitionColumns", "c_name")
.mode(SaveMode.Overwrite)
.save()
val dwframe = spark.read.format("carbondata").option("tableName",
"order").load()
val dwSelframe = dwframe.as("A")
val ds1 = sc.parallelize(3 to 10, 4)
.map {x =>
if (x <= 4)
{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }
else
{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
}.toDF("id", "name", "c_name", "quantity", "price", "state")
ds1.show()
val ds2 = sc.parallelize(1 to 2, 4)
.map
{x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
.toDS().toDF()
ds2.show()
val ds3 = ds1.union(ds2)
ds3.show()
val odsframe = ds3.as("B")
var matches = Seq.empty[MergeMatch]
val updateMap = Map(col("id") -> col("A.id"),
col("price") -> expr("B.price + 1"),
col("state") -> col("B.state"))
val insertMap = Map(col("id") -> col("B.id"),
col("name") -> col("B.name"),
col("c_name") -> col("B.c_name"),
col("quantity") -> col("B.quantity"),
col("price") -> expr("B.price * 100"),
col("state") -> col("B.state"))
val insertMap_u = Map(col("id") -> col("id"),
col("name") -> col("name"),
col("c_name") -> lit("insert"),
col("quantity") -> col("quantity"),
col("price") -> expr("price"),
col("state") -> col("state"))
val insertMap_d = Map(col("id") -> col("id"),
col("name") -> col("name"),
col("c_name") -> lit("delete"),
col("quantity") -> col("quantity"),
col("price") -> expr("price"),
col("state") -> col("state"))
matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
TableIdentifier("order_hist"))))
matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
TableIdentifier("order_hist"))))
{code:java}
sql("select count(*) from order").show()
sql("select count(*) from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count from order_hist where c_name = 'delete'").show()
sql("select count from order_hist where c_name = 'insert'").show() placeholder
{code}
sql("select count(*) from order").show()
sql("select count(*) from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count(*) from order_hist where c_name = 'delete'").show()
sql("select count(*) from order_hist where c_name = 'insert'").show()
Results in spark-shell --master yarn
scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|10|
+--------+
scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|0|
+--------+
scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
+-----+
scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|0|
+--------+
scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|0|
+--------+
Results in spark-shell --master local
scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|10|
+--------+
scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|2|
+--------+
scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
|7500|
+-----+
scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|2|
+--------+
scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|2|
+--------+
was:
The result sets are different when run the queries in spark-shell--master local
and spark-shell --master yarn (Two Different Spark Deploy Modes)
Steps to Reproduce Issue :
import scala.collection.JavaConverters._
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.mutation.merge.
{CarbonMergeDataSetCommand, DeleteAction, InsertAction,
InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction,
WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.
{BooleanType, DateType, IntegerType, StringType, StructField, StructType}
import spark.implicits._
sql("drop table if exists order").show()
sql("drop table if exists order_hist").show()
sql("create table order_hist(id string, name string, quantity int, price int,
state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
val initframe = sc.parallelize(1 to 10, 4).map
{ x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}
.toDF("id", "name", "c_name", "quantity", "price", "state")
initframe.write
.format("carbondata")
.option("tableName", "order")
.option("partitionColumns", "c_name")
.mode(SaveMode.Overwrite)
.save()
val dwframe = spark.read.format("carbondata").option("tableName",
"order").load()
val dwSelframe = dwframe.as("A")
val ds1 = sc.parallelize(3 to 10, 4)
.map {x =>
if (x <= 4)
{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }
else
{ ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
}.toDF("id", "name", "c_name", "quantity", "price", "state")
ds1.show()
val ds2 = sc.parallelize(1 to 2, 4)
.map
{x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
.toDS().toDF()
ds2.show()
val ds3 = ds1.union(ds2)
ds3.show()
val odsframe = ds3.as("B")
var matches = Seq.empty[MergeMatch]
val updateMap = Map(col("id") -> col("A.id"),
col("price") -> expr("B.price + 1"),
col("state") -> col("B.state"))
val insertMap = Map(col("id") -> col("B.id"),
col("name") -> col("B.name"),
col("c_name") -> col("B.c_name"),
col("quantity") -> col("B.quantity"),
col("price") -> expr("B.price * 100"),
col("state") -> col("B.state"))
val insertMap_u = Map(col("id") -> col("id"),
col("name") -> col("name"),
col("c_name") -> lit("insert"),
col("quantity") -> col("quantity"),
col("price") -> expr("price"),
col("state") -> col("state"))
val insertMap_d = Map(col("id") -> col("id"),
col("name") -> col("name"),
col("c_name") -> lit("delete"),
col("quantity") -> col("quantity"),
col("price") -> expr("price"),
col("state") -> col("state"))
matches ++= Seq(WhenMatched(Some(col("A.state") =!=
col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
TableIdentifier("order_hist"))))
matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
matches ++=
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
TableIdentifier("order_hist"))))
{code:java}
sql("select count from order").show()
sql("select count from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count from order_hist where c_name = 'delete'").show()
sql("select count from order_hist where c_name = 'insert'").show() placeholder
{code}
sql("select count(*) from order").show()
sql("select count(*) from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count(*) from order_hist where c_name = 'delete'").show()
sql("select count(*) from order_hist where c_name = 'insert'").show()
Results in spark-shell --master yarn
scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|10|
+--------+
scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|0|
+--------+
scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
+-----+
scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|0|
+--------+
scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|0|
+--------+
Results in spark-shell --master local
scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|10|
+--------+
scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|2|
+--------+
scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
|7500|
+-----+
scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|2|
+--------+
scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|2|
+--------+
> Merge Update and Insert with Partition Table is giving different results in
> different spark deploy modes
> --------------------------------------------------------------------------------------------------------
>
> Key: CARBONDATA-3851
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3851
> Project: CarbonData
> Issue Type: Bug
> Components: spark-integration
> Affects Versions: 2.0.0
> Reporter: Sachin Ramachandra Setty
> Priority: Major
>
> The result sets are different when run the queries in spark-shell--master
> local and spark-shell --master yarn (Two Different Spark Deploy Modes)
> Steps to Reproduce Issue :
> import scala.collection.JavaConverters._
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.catalyst.TableIdentifier
> import org.apache.spark.sql.execution.command.mutation.merge.
> {CarbonMergeDataSetCommand, DeleteAction, InsertAction,
> InsertInHistoryTableAction, MergeDataSetMatches, MergeMatch, UpdateAction,
> WhenMatched, WhenNotMatched, WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.test.util.QueryTest
> import org.apache.spark.sql.types.
> {BooleanType, DateType, IntegerType, StringType, StructField, StructType}
> import spark.implicits._
> sql("drop table if exists order").show()
> sql("drop table if exists order_hist").show()
> sql("create table order_hist(id string, name string, quantity int, price
> int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
> val initframe = sc.parallelize(1 to 10, 4).map
> { x => ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)}
> .toDF("id", "name", "c_name", "quantity", "price", "state")
> initframe.write
> .format("carbondata")
> .option("tableName", "order")
> .option("partitionColumns", "c_name")
> .mode(SaveMode.Overwrite)
> .save()
> val dwframe = spark.read.format("carbondata").option("tableName",
> "order").load()
> val dwSelframe = dwframe.as("A")
> val ds1 = sc.parallelize(3 to 10, 4)
> .map {x =>
> if (x <= 4)
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2) }
> else
> { ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> }.toDF("id", "name", "c_name", "quantity", "price", "state")
> ds1.show()
> val ds2 = sc.parallelize(1 to 2, 4)
> .map
> {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1) }
> .toDS().toDF()
> ds2.show()
> val ds3 = ds1.union(ds2)
> ds3.show()
> val odsframe = ds3.as("B")
> var matches = Seq.empty[MergeMatch]
> val updateMap = Map(col("id") -> col("A.id"),
> col("price") -> expr("B.price + 1"),
> col("state") -> col("B.state"))
> val insertMap = Map(col("id") -> col("B.id"),
> col("name") -> col("B.name"),
> col("c_name") -> col("B.c_name"),
> col("quantity") -> col("B.quantity"),
> col("price") -> expr("B.price * 100"),
> col("state") -> col("B.state"))
> val insertMap_u = Map(col("id") -> col("id"),
> col("name") -> col("name"),
> col("c_name") -> lit("insert"),
> col("quantity") -> col("quantity"),
> col("price") -> expr("price"),
> col("state") -> col("state"))
> val insertMap_d = Map(col("id") -> col("id"),
> col("name") -> col("name"),
> col("c_name") -> lit("delete"),
> col("quantity") -> col("quantity"),
> col("price") -> expr("price"),
> col("state") -> col("state"))
> matches ++= Seq(WhenMatched(Some(col("A.state") =!=
> col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
> TableIdentifier("order_hist"))))
> matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
> matches ++=
> Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
> TableIdentifier("order_hist"))))
>
>
> {code:java}
> sql("select count(*) from order").show()
> sql("select count(*) from order where state = 2").show()
> sql("select price from order where id = 'newid1'").show()
> sql("select count from order_hist where c_name = 'delete'").show()
> sql("select count from order_hist where c_name = 'insert'").show() placeholder
> {code}
>
> sql("select count(*) from order").show()
> sql("select count(*) from order where state = 2").show()
> sql("select price from order where id = 'newid1'").show()
> sql("select count(*) from order_hist where c_name = 'delete'").show()
> sql("select count(*) from order_hist where c_name = 'insert'").show()
> Results in spark-shell --master yarn
> scala> sql("select count(*) from order").show()
> +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
> +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
> +-----+
> |price|
> +-----+
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
> +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
> +--------+
> |count(1)|
> +--------+
> |0|
> +--------+
> Results in spark-shell --master local
> scala> sql("select count(*) from order").show()
> +--------+
> |count(1)|
> +--------+
> |10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
> +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
> +-----+
> |price|
> +-----+
> |7500|
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
> +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
> +--------+
> |count(1)|
> +--------+
> |2|
> +--------+
--
This message was sent by Atlassian Jira
(v8.3.4#803005)