akashrn5 commented on a change in pull request #4151:
URL: https://github.com/apache/carbondata/pull/4151#discussion_r654207938
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with
BeforeAndAfterEach {
assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
}
+ test("test load with multiple inserts") {
+ sql("drop table if exists catalog_returns_5")
+ sql("drop table if exists catalog_returns_6")
+ sql("create table catalog_returns_5(cr_returned_date_sk
int,cr_returned_time_sk int," +
+ "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES
TERMINATED BY '\\n'")
+ sql("insert into catalog_returns_5 values(1,2,3)")
+ sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk
int) partitioned by" +
+ " (cr_returned_date_sk int) stored as carbondata")
+ sql(
+ "from catalog_returns_5 insert overwrite table catalog_returns_6
partition " +
+ "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk,
cr_returned_date_sk where " +
+ "cr_returned_date_sk is not null distribute by cr_returned_date_sk
insert overwrite table " +
+ "catalog_returns_6 partition (cr_returned_date_sk) select
cr_returned_time_sk, cr_item_sk, " +
+ "cr_returned_date_sk where cr_returned_date_sk is null distribute by
cr_returned_date_sk")
Review comment:
here please collect the segments IDs returned from the insert command
and validate them also.
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with
BeforeAndAfterEach {
assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
}
+ test("test load with multiple inserts") {
+ sql("drop table if exists catalog_returns_5")
+ sql("drop table if exists catalog_returns_6")
+ sql("create table catalog_returns_5(cr_returned_date_sk
int,cr_returned_time_sk int," +
+ "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES
TERMINATED BY '\\n'")
+ sql("insert into catalog_returns_5 values(1,2,3)")
+ sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk
int) partitioned by" +
+ " (cr_returned_date_sk int) stored as carbondata")
+ sql(
+ "from catalog_returns_5 insert overwrite table catalog_returns_6
partition " +
+ "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk,
cr_returned_date_sk where " +
+ "cr_returned_date_sk is not null distribute by cr_returned_date_sk
insert overwrite table " +
+ "catalog_returns_6 partition (cr_returned_date_sk) select
cr_returned_time_sk, cr_item_sk, " +
+ "cr_returned_date_sk where cr_returned_date_sk is null distribute by
cr_returned_date_sk")
+ checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1)))
Review comment:
```suggestion
checkAnswer(sql("select *cfrom catalog_returns_6"), Seq(Row(2, 3, 1)))
```
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy {
}
}
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {
+
+ override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
+
+ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+ val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+ val internalRow =
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
+ val unsafeProjection =
UnsafeProjection.create(output.map(_.dataType).toArray)
+ // To make GenericInternalRow to UnsafeRow
+ val row = unsafeProjection(internalRow.head)
+ Seq(row)
+ }
+
+ override protected def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil
+
+ override def output: Seq[Attribute] = cmd.output
+
+ override def nodeName: String = "Execute " + cmd.nodeName
+
+ override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
+
+ override def executeToIterator: Iterator[InternalRow] =
sideEffectResult.toIterator
+
+ override def executeTake(limit: Int): Array[InternalRow] =
sideEffectResult.take(limit).toArray
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+ }
Review comment:
here no need to override all, just can override abstract one like`
doExecute()` and `output`, others we are not using so no need to override since
no specific logic
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -57,7 +59,14 @@ object DMLStrategy extends SparkStrategy {
case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
case insert: InsertIntoCarbonTable =>
- ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+ if (insert.isMultipleInserts) {
+ // when multiple insert statements are given with single plan
(Union),
+ // Spark expects row to be of UnsafeRow. Here use UnionCommandExec to
+ // implement custom sideEffectResult and return row as UnsafeRow
+ UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
Review comment:
here the comment gives little different meaning, here you can say about
how insert command in carbon returns the rows containing corresponding
segmentIDs and incase of this specific multiple scenario the Union node
executes in the physical plan phase of the command, so the rows should be of
unsafe row object. So we should override the `sideEffectResult` to prepare the
content of command's corresponding rdd from physical plan of insert into
command. You can clean it and update the command something like this
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -365,3 +374,33 @@ object DMLStrategy extends SparkStrategy {
}
}
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {
Review comment:
give a class level comment
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -394,6 +394,25 @@ class TestLoadDataGeneral extends QueryTest with
BeforeAndAfterEach {
assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
}
+ test("test load with multiple inserts") {
+ sql("drop table if exists catalog_returns_5")
+ sql("drop table if exists catalog_returns_6")
+ sql("create table catalog_returns_5(cr_returned_date_sk
int,cr_returned_time_sk int," +
+ "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES
TERMINATED BY '\\n'")
+ sql("insert into catalog_returns_5 values(1,2,3)")
+ sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk
int) partitioned by" +
+ " (cr_returned_date_sk int) stored as carbondata")
+ sql(
+ "from catalog_returns_5 insert overwrite table catalog_returns_6
partition " +
+ "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk,
cr_returned_date_sk where " +
+ "cr_returned_date_sk is not null distribute by cr_returned_date_sk
insert overwrite table " +
+ "catalog_returns_6 partition (cr_returned_date_sk) select
cr_returned_time_sk, cr_item_sk, " +
+ "cr_returned_date_sk where cr_returned_date_sk is null distribute by
cr_returned_date_sk")
+ checkAnswer(sql("select *from catalog_returns_6"), Seq(Row(2, 3, 1)))
Review comment:
```suggestion
checkAnswer(sql("select * from catalog_returns_6"), Seq(Row(2, 3, 1)))
```
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
##########
@@ -250,13 +251,19 @@ case class CarbonPreInsertionCasts(sparkSession:
SparkSession) extends Rule[Logi
case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- castChildOutput(p, relation, child)
+ var isMultipleInserts = false
Review comment:
```suggestion
var containsMultipleInserts = false
```
Please in all places
##########
File path:
integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy {
case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
case insert: InsertIntoCarbonTable =>
- ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+ if (insert.isMultipleInserts) {
+ // Successful insert in carbon will return segment ID as row.
Review comment:
```suggestion
// Successful insert in carbon will return segment ID in a row.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]