[ 
https://issues.apache.org/jira/browse/CARBONDATA-3851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Ramachandra Setty updated CARBONDATA-3851:
-------------------------------------------------
    Description: 
The result sets are different when run the queries in spark-shell--master local 
and spark-shell --master yarn (Two Different Spark Deploy Modes)

Steps to Reproduce Issue :

    import scala.collection.JavaConverters._
    import java.sql.Date
    
    import org.apache.spark.sql._
    import org.apache.spark.sql.CarbonSession._
    import org.apache.spark.sql.catalyst.TableIdentifier
    import 
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
 DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
WhenNotMatchedAndExistsOnlyOnTarget}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.test.util.QueryTest
    import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
StringType, StructField, StructType}
    import spark.implicits._
        
   sql("drop table if exists order").show()
    sql("drop table if exists order_hist").show()
    sql("create table order_hist(id string, name string, quantity int, price 
int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
        
        val initframe = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, 
s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", 
"quantity", "price", "state")
        
    initframe.write
      .format("carbondata")
      .option("tableName", "order")
      .option("partitionColumns", "c_name")
      .mode(SaveMode.Overwrite)
      .save()

    val dwframe = spark.read.format("carbondata").option("tableName", 
"order").load()
    val dwSelframe = dwframe.as("A")

        
        val ds1 = sc.parallelize(3 to 10, 4)
      .map {x =>
        if (x <= 4) {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
        } else {
          ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
        }
      }.toDF("id", "name", "c_name", "quantity", "price", "state")
          
    ds1.show()
    val ds2 = sc.parallelize(1 to 2, 4)
      .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)
      }.toDS().toDF()
    ds2.show()
    val ds3 = ds1.union(ds2)
        ds3.show()
        
val odsframe = ds3.as("B")


   var matches = Seq.empty[MergeMatch]
   val updateMap = Map(col("id") -> col("A.id"),
      col("price") -> expr("B.price + 1"),
      col("state") -> col("B.state"))

    val insertMap = Map(col("id") -> col("B.id"),
      col("name") -> col("B.name"),
      col("c_name") -> col("B.c_name"),
      col("quantity") -> col("B.quantity"),
      col("price") -> expr("B.price * 100"),
      col("state") -> col("B.state"))

    val insertMap_u = Map(col("id") -> col("id"),
      col("name") -> col("name"),
      col("c_name") -> lit("insert"),
      col("quantity") -> col("quantity"),
      col("price") -> expr("price"),
      col("state") -> col("state"))

    val insertMap_d = Map(col("id") -> col("id"),
      col("name") -> col("name"),
      col("c_name") -> lit("delete"),
      col("quantity") -> col("quantity"),
      col("price") -> expr("price"),
      col("state") -> col("state"))

   matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
 TableIdentifier("order_hist"))))
   matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
   matches ++= 
Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
 TableIdentifier("order_hist"))))

sql("select count(*) from order").show()
sql("select count(*) from order where state = 2").show()
sql("select price from order where id = 'newid1'").show()
sql("select count(*) from order_hist where c_name = 'delete'").show()
sql("select count(*) from order_hist where c_name = 'insert'").show()


*Results in spark-shell --master yarn *

scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|      10|
+--------+


scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+


scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
+-----+


scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+


scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|       0|
+--------+


*Results in spark-shell --master local* 

 scala> sql("select count(*) from order").show()
+--------+
|count(1)|
+--------+
|      10|
+--------+


scala> sql("select count(*) from order where state = 2").show()
+--------+
|count(1)|
+--------+
|       2|
+--------+


scala> sql("select price from order where id = 'newid1'").show()
+-----+
|price|
+-----+
| 7500|
+-----+


scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
+--------+
|count(1)|
+--------+
|       2|
+--------+


scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
+--------+
|count(1)|
+--------+
|       2|
+--------+


 

  was:The result sets are different when run the queries in spark-shell--master 
local and spark-shell --master yarn (Two Different Spark Deploy Modes)


> Merge Update and Insert with Partition Table is giving different results in 
> different spark deploy modes
> --------------------------------------------------------------------------------------------------------
>
>                 Key: CARBONDATA-3851
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-3851
>             Project: CarbonData
>          Issue Type: Bug
>          Components: spark-integration
>    Affects Versions: 2.0.0
>            Reporter: Sachin Ramachandra Setty
>            Priority: Major
>
> The result sets are different when run the queries in spark-shell--master 
> local and spark-shell --master yarn (Two Different Spark Deploy Modes)
> Steps to Reproduce Issue :
>     import scala.collection.JavaConverters._
>     import java.sql.Date
>     
>     import org.apache.spark.sql._
>     import org.apache.spark.sql.CarbonSession._
>     import org.apache.spark.sql.catalyst.TableIdentifier
>     import 
> org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
>  DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches, 
> MergeMatch, UpdateAction, WhenMatched, WhenNotMatched, 
> WhenNotMatchedAndExistsOnlyOnTarget}
>     import org.apache.spark.sql.functions._
>     import org.apache.spark.sql.test.util.QueryTest
>     import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType, 
> StringType, StructField, StructType}
>     import spark.implicits._
>       
>    sql("drop table if exists order").show()
>     sql("drop table if exists order_hist").show()
>     sql("create table order_hist(id string, name string, quantity int, price 
> int, state int) PARTITIONED BY (c_name String) STORED AS carbondata").show()
>       
>       val initframe = sc.parallelize(1 to 10, 4).map{ x => ("id"+x, 
> s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name", 
> "quantity", "price", "state")
>       
>     initframe.write
>       .format("carbondata")
>       .option("tableName", "order")
>       .option("partitionColumns", "c_name")
>       .mode(SaveMode.Overwrite)
>       .save()
>     val dwframe = spark.read.format("carbondata").option("tableName", 
> "order").load()
>     val dwSelframe = dwframe.as("A")
>       
>       val ds1 = sc.parallelize(3 to 10, 4)
>       .map {x =>
>         if (x <= 4) {
>           ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
>         } else {
>           ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
>         }
>       }.toDF("id", "name", "c_name", "quantity", "price", "state")
>         
>     ds1.show()
>     val ds2 = sc.parallelize(1 to 2, 4)
>       .map {x => ("newid"+x, s"order$x",s"customer$x", x*10, x*75, 1)
>       }.toDS().toDF()
>     ds2.show()
>     val ds3 = ds1.union(ds2)
>       ds3.show()
>       
> val odsframe = ds3.as("B")
>    var matches = Seq.empty[MergeMatch]
>    val updateMap = Map(col("id") -> col("A.id"),
>       col("price") -> expr("B.price + 1"),
>       col("state") -> col("B.state"))
>     val insertMap = Map(col("id") -> col("B.id"),
>       col("name") -> col("B.name"),
>       col("c_name") -> col("B.c_name"),
>       col("quantity") -> col("B.quantity"),
>       col("price") -> expr("B.price * 100"),
>       col("state") -> col("B.state"))
>     val insertMap_u = Map(col("id") -> col("id"),
>       col("name") -> col("name"),
>       col("c_name") -> lit("insert"),
>       col("quantity") -> col("quantity"),
>       col("price") -> expr("price"),
>       col("state") -> col("state"))
>     val insertMap_d = Map(col("id") -> col("id"),
>       col("name") -> col("name"),
>       col("c_name") -> lit("delete"),
>       col("quantity") -> col("quantity"),
>       col("price") -> expr("price"),
>       col("state") -> col("state"))
>    matches ++= Seq(WhenMatched(Some(col("A.state") =!= 
> col("B.state"))).addAction(UpdateAction(updateMap)).addAction(InsertInHistoryTableAction(insertMap_u,
>  TableIdentifier("order_hist"))))
>    matches ++= Seq(WhenNotMatched().addAction(InsertAction(insertMap)))
>    matches ++= 
> Seq(WhenNotMatchedAndExistsOnlyOnTarget().addAction(DeleteAction()).addAction(InsertInHistoryTableAction(insertMap_d,
>  TableIdentifier("order_hist"))))
> sql("select count(*) from order").show()
> sql("select count(*) from order where state = 2").show()
> sql("select price from order where id = 'newid1'").show()
> sql("select count(*) from order_hist where c_name = 'delete'").show()
> sql("select count(*) from order_hist where c_name = 'insert'").show()
> *Results in spark-shell --master yarn *
> scala> sql("select count(*) from order").show()
> +--------+
> |count(1)|
> +--------+
> |      10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
> +--------+
> |count(1)|
> +--------+
> |       0|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
> +-----+
> |price|
> +-----+
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
> +--------+
> |count(1)|
> +--------+
> |       0|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
> +--------+
> |count(1)|
> +--------+
> |       0|
> +--------+
> *Results in spark-shell --master local* 
>  scala> sql("select count(*) from order").show()
> +--------+
> |count(1)|
> +--------+
> |      10|
> +--------+
> scala> sql("select count(*) from order where state = 2").show()
> +--------+
> |count(1)|
> +--------+
> |       2|
> +--------+
> scala> sql("select price from order where id = 'newid1'").show()
> +-----+
> |price|
> +-----+
> | 7500|
> +-----+
> scala> sql("select count(*) from order_hist where c_name = 'delete'").show()
> +--------+
> |count(1)|
> +--------+
> |       2|
> +--------+
> scala> sql("select count(*) from order_hist where c_name = 'insert'").show()
> +--------+
> |count(1)|
> +--------+
> |       2|
> +--------+
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to