[
https://issues.apache.org/jira/browse/CARBONDATA-3852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sachin Ramachandra Setty updated CARBONDATA-3852:
-------------------------------------------------
Description:
The result sets are different when run the sql queries in spark-shell --master
local and spark-shell --master yarn (Two Different Spark Deploy Modes)
{code}
import scala.collection.JavaConverters._
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches,
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched,
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType,
StringType, StructField, StructType}
import spark.implicits._
val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x,
s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name",
"quantity", "price", "state")
df1.write.format("carbondata").option("tableName",
"order").mode(SaveMode.Overwrite).save()
val dwframe = spark.read.format("carbondata").option("tableName",
"order").load()
val dwSelframe = dwframe.as("A")
val ds1 = sc.parallelize(3 to 10, 4)
.map {x =>
if (x <= 4) {
("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
} else {
("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
}
}.toDF("id", "name", "c_name", "quantity", "price", "state")
val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x,
s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF()
val ds3 = ds1.union(ds2)
val odsframe = ds3.as("B")
sql("drop table if exists target").show()
val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value",
StringType))))
initframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName",
"target").load()
var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType))))
ccd.createOrReplaceTempView("changes")
ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes
GROUP BY key)")
val updateMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=false").
updateExpr(updateMap).
whenNotMatched("B.deleted=false").
insertExpr(insertMap).
whenMatched("B.deleted=true").
delete().execute()
{code}
SQL Queries to run :
{code}
sql("select count(*) from target").show()
sql("select * from target order by key").show()
{code}
Results in spark-shell --master yarn
{code}
scala> sql("select count(*) from target").show()
+--------+
|count(1)|
+--------+
| 4|
+--------+
scala> sql("select * from target order by key").show()
+---+-----+
|key|value|
+---+-----+
| a| 0|
| b| 1|
| c| 2|
| d| 3|
+---+-----+
{code}
Results in spark-shell --master local
{code}
scala> sql("select count(*) from target").show()
+--------+
|count(1)|
+--------+
| 3|
+--------+
scala> sql("select * from target order by key").show()
+---+-----+
|key|value|
+---+-----+
| c| 200|
| d| 3|
| e| 100|
+---+-----+
{code}
was:
The result sets are different when run the sql queries in spark-shell --master
local and spark-shell --master yarn (Two Different Spark Deploy Modes)
{code}
import scala.collection.JavaConverters._
import java.sql.Date
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
import
org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches,
MergeMatch, UpdateAction, WhenMatched, WhenNotMatched,
WhenNotMatchedAndExistsOnlyOnTarget}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType,
StringType, StructField, StructType}
import spark.implicits._
val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x,
s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name",
"quantity", "price", "state")
df1.write.format("carbondata").option("tableName",
"order").mode(SaveMode.Overwrite).save()
val dwframe = spark.read.format("carbondata").option("tableName",
"order").load()
val dwSelframe = dwframe.as("A")
val ds1 = sc.parallelize(3 to 10, 4)
.map {x =>
if (x <= 4) {
("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
} else {
("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
}
}.toDF("id", "name", "c_name", "quantity", "price", "state")
val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x,
s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF()
val ds3 = ds1.union(ds2)
val odsframe = ds3.as("B")
sql("drop table if exists target").show()
val initframe = spark.createDataFrame(Seq(
Row("a", "0"),
Row("b", "1"),
Row("c", "2"),
Row("d", "3")
).asJava, StructType(Seq(StructField("key", StringType), StructField("value",
StringType))))
initframe.write
.format("carbondata")
.option("tableName", "target")
.option("partitionColumns", "value")
.mode(SaveMode.Overwrite)
.save()
val target = spark.read.format("carbondata").option("tableName",
"target").load()
var ccd =
spark.createDataFrame(Seq(
Row("a", "10", false, 0),
Row("a", null, true, 1),
Row("b", null, true, 2),
Row("c", null, true, 3),
Row("c", "20", false, 4),
Row("c", "200", false, 5),
Row("e", "100", false, 6)
).asJava,
StructType(Seq(StructField("key", StringType),
StructField("newValue", StringType),
StructField("deleted", BooleanType), StructField("time", IntegerType))))
ccd.createOrReplaceTempView("changes")
ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted
FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes
GROUP BY key)")
val updateMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
val insertMap = Map("key" -> "B.key", "value" ->
"B.newValue").asInstanceOf[Map[Any, Any]]
target.as("A").merge(ccd.as("B"), "A.key=B.key").
whenMatched("B.deleted=false").
updateExpr(updateMap).
whenNotMatched("B.deleted=false").
insertExpr(insertMap).
whenMatched("B.deleted=true").
delete().execute()
{code}
> CCD Merge with Partition Table is giving different results in different spark
> deploy modes
> ------------------------------------------------------------------------------------------
>
> Key: CARBONDATA-3852
> URL: https://issues.apache.org/jira/browse/CARBONDATA-3852
> Project: CarbonData
> Issue Type: Bug
> Components: spark-integration
> Affects Versions: 2.0.0
> Reporter: Sachin Ramachandra Setty
> Priority: Major
>
> The result sets are different when run the sql queries in spark-shell
> --master local and spark-shell --master yarn (Two Different Spark Deploy
> Modes)
> {code}
> import scala.collection.JavaConverters._
> import java.sql.Date
> import org.apache.spark.sql._
> import org.apache.spark.sql.CarbonSession._
> import org.apache.spark.sql.catalyst.TableIdentifier
> import
> org.apache.spark.sql.execution.command.mutation.merge.{CarbonMergeDataSetCommand,
> DeleteAction, InsertAction, InsertInHistoryTableAction, MergeDataSetMatches,
> MergeMatch, UpdateAction, WhenMatched, WhenNotMatched,
> WhenNotMatchedAndExistsOnlyOnTarget}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.test.util.QueryTest
> import org.apache.spark.sql.types.{BooleanType, DateType, IntegerType,
> StringType, StructField, StructType}
> import spark.implicits._
> val df1 = sc.parallelize(1 to 10, 4).map{ x => ("id"+x,
> s"order$x",s"customer$x", x*10, x*75, 1)}.toDF("id", "name", "c_name",
> "quantity", "price", "state")
> df1.write.format("carbondata").option("tableName",
> "order").mode(SaveMode.Overwrite).save()
> val dwframe = spark.read.format("carbondata").option("tableName",
> "order").load()
> val dwSelframe = dwframe.as("A")
> val ds1 = sc.parallelize(3 to 10, 4)
> .map {x =>
> if (x <= 4) {
> ("id"+x, s"order$x",s"customer$x", x*10, x*75, 2)
> } else {
> ("id"+x, s"order$x",s"customer$x", x*10, x*75, 1)
> }
> }.toDF("id", "name", "c_name", "quantity", "price", "state")
>
> val ds2 = sc.parallelize(1 to 2, 4).map {x => ("newid"+x,
> s"order$x",s"customer$x", x*10, x*75, 1)}.toDS().toDF()
> val ds3 = ds1.union(ds2)
> val odsframe = ds3.as("B")
>
> sql("drop table if exists target").show()
> val initframe = spark.createDataFrame(Seq(
> Row("a", "0"),
> Row("b", "1"),
> Row("c", "2"),
> Row("d", "3")
> ).asJava, StructType(Seq(StructField("key", StringType), StructField("value",
> StringType))))
> initframe.write
> .format("carbondata")
> .option("tableName", "target")
> .option("partitionColumns", "value")
> .mode(SaveMode.Overwrite)
> .save()
>
> val target = spark.read.format("carbondata").option("tableName",
> "target").load()
> var ccd =
> spark.createDataFrame(Seq(
> Row("a", "10", false, 0),
> Row("a", null, true, 1),
> Row("b", null, true, 2),
> Row("c", null, true, 3),
> Row("c", "20", false, 4),
> Row("c", "200", false, 5),
> Row("e", "100", false, 6)
> ).asJava,
> StructType(Seq(StructField("key", StringType),
> StructField("newValue", StringType),
> StructField("deleted", BooleanType), StructField("time", IntegerType))))
>
> ccd.createOrReplaceTempView("changes")
> ccd = sql("SELECT key, latest.newValue as newValue, latest.deleted as deleted
> FROM ( SELECT key, max(struct(time, newValue, deleted)) as latest FROM
> changes GROUP BY key)")
> val updateMap = Map("key" -> "B.key", "value" ->
> "B.newValue").asInstanceOf[Map[Any, Any]]
> val insertMap = Map("key" -> "B.key", "value" ->
> "B.newValue").asInstanceOf[Map[Any, Any]]
> target.as("A").merge(ccd.as("B"), "A.key=B.key").
> whenMatched("B.deleted=false").
> updateExpr(updateMap).
> whenNotMatched("B.deleted=false").
> insertExpr(insertMap).
> whenMatched("B.deleted=true").
> delete().execute()
>
> {code}
> SQL Queries to run :
> {code}
> sql("select count(*) from target").show()
> sql("select * from target order by key").show()
> {code}
> Results in spark-shell --master yarn
> {code}
> scala> sql("select count(*) from target").show()
> +--------+
> |count(1)|
> +--------+
> | 4|
> +--------+
> scala> sql("select * from target order by key").show()
> +---+-----+
> |key|value|
> +---+-----+
> | a| 0|
> | b| 1|
> | c| 2|
> | d| 3|
> +---+-----+
> {code}
> Results in spark-shell --master local
> {code}
> scala> sql("select count(*) from target").show()
> +--------+
> |count(1)|
> +--------+
> | 3|
> +--------+
> scala> sql("select * from target order by key").show()
> +---+-----+
> |key|value|
> +---+-----+
> | c| 200|
> | d| 3|
> | e| 100|
> +---+-----+
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)