This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f7df9  [CARBONDATA-4211] Fix - from xx Insert into select fails if 
an SQL statement contains multiple inserts
d8f7df9 is described below

commit d8f7df9d7bab1588583d96a7e1f1dab59b31622f
Author: ShreelekhyaG <[email protected]>
AuthorDate: Mon Jun 14 21:06:16 2021 +0530

    [CARBONDATA-4211] Fix - from xx Insert into select fails if an SQL 
statement contains multiple inserts
    
    Why is this PR needed?
    When multiple inserts with single query is used, it fails from SparkPlan 
with: java.lang.ClassCastException:
    GenericInternalRow cannot be cast to UnsafeRow.
    For every successful insert/load we return Segment ID as a row. For 
multiple inserts also, we are returning
    a row containing Segment ID but while processing in spark 
ClassCastException is thrown.
    
    What changes were proposed in this PR?
    When multiple insert query is given, it has Union node in the plan. Based 
on its presence, made changes
    to use flag isMultipleInserts to call class UnionCommandExec and 
implemented custom sideEffectResult which
    converts GenericInternalRow to UnsafeRow and return.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4151
---
 .../apache/spark/sql/CarbonCatalystOperators.scala |  3 +-
 .../spark/sql/execution/strategy/DMLStrategy.scala | 45 +++++++++++++++++++---
 .../spark/sql/hive/CarbonAnalysisRules.scala       |  9 +++--
 .../testsuite/dataload/TestLoadDataGeneral.scala   | 21 ++++++++++
 4 files changed, 68 insertions(+), 10 deletions(-)

diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index c5fa35d..4813aa5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -67,7 +67,8 @@ case class InsertIntoCarbonTable (table: 
CarbonDatasourceHadoopRelation,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
     overwrite: Boolean,
-    ifNotExists: Boolean)
+    ifNotExists: Boolean,
+    containsMultipleInserts: Boolean)
   extends Command {
 
     override def output: Seq[Attribute] = {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
index 33c52e5..af1350d 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
@@ -22,15 +22,16 @@ import java.util.Locale
 import scala.collection.mutable
 
 import org.apache.log4j.Logger
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{CarbonCountStar, CarbonDatasourceHadoopRelation, 
CarbonToSparkAdapter, CountStarPlan, InsertIntoCarbonTable, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, 
AttributeReference, Cast, Descending, Expression, IntegerLiteral, Literal, 
NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
AttributeReference, Cast, Descending, Expression, IntegerLiteral, Literal, 
NamedExpression, ScalaUDF, SortOrder, UnsafeProjection}
 import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
PhysicalOperation}
-import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, Limit, 
LogicalPlan, Project, ReturnAnswer, Sort}
-import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec, 
FilterExec, PlanLater, ProjectExec, SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec, LoadDataCommand}
+import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec, 
FilterExec, LeafExecNode, PlanLater, ProjectExec, SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec, LoadDataCommand, RunnableCommand}
 import 
org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, 
LogicalRelation}
 import org.apache.spark.sql.execution.joins.{BroadCastPolygonFilterPushJoin, 
BuildLeft, BuildRight}
 import org.apache.spark.sql.execution.strategy.CarbonPlanHelper.isCarbonTable
@@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy {
       case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
         ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
       case insert: InsertIntoCarbonTable =>
-        ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+        if (insert.containsMultipleInserts) {
+          // Successful insert in carbon will return segment ID in a row.
+          // In-case of this specific multiple inserts scenario the Union node 
executes in the
+          // physical plan phase of the command, so the rows should be of 
unsafe row object.
+          // So we should override the sideEffectResult to prepare the content 
of command's
+          // corresponding rdd from physical plan of insert into command.
+          UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+        } else {
+          ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+        }
       case insert: InsertIntoHadoopFsRelationCommand
         if insert.catalogTable.isDefined && 
isCarbonTable(insert.catalogTable.get.identifier) =>
         DataWritingCommandExec(DMLHelper.insertInto(insert), 
planLater(insert.query)) :: Nil
@@ -365,3 +375,26 @@ object DMLStrategy extends SparkStrategy {
   }
 }
 
+/**
+ * This class will be used when Union node is present in plan with multiple 
inserts.
+ * It is a physical operator that executes the run method of a RunnableCommand 
and
+ * saves the result to prevent multiple executions.
+ */
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {
+
+  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+    val internalRow = 
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
+    val unsafeProjection = 
UnsafeProjection.create(output.map(_.dataType).toArray)
+    // To make GenericInternalRow to UnsafeRow
+    val row = unsafeProjection(internalRow.head)
+    Seq(row)
+  }
+
+  override def output: Seq[Attribute] = cmd.output
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+  }
+}
+
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 8261415..6fabf09 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -250,13 +250,15 @@ case class CarbonPreInsertionCasts(sparkSession: 
SparkSession) extends Rule[Logi
 
       case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
         if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
-        castChildOutput(p, relation, child)
+        // when plan contains Union, it can have multiple insert statements as 
its children
+        castChildOutput(p, relation, child, plan.isInstanceOf[Union])
     }
   }
 
   def castChildOutput(p: InsertIntoTable,
       relation: LogicalRelation,
-      child: LogicalPlan): LogicalPlan = {
+      child: LogicalPlan,
+      containsMultipleInserts: Boolean): LogicalPlan = {
     val carbonDSRelation = 
relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
       .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
@@ -300,7 +302,8 @@ case class CarbonPreInsertionCasts(sparkSession: 
SparkSession) extends Rule[Logi
 
       val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
 
-      InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild, 
overwrite, true)
+      InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild, 
overwrite, true,
+        containsMultipleInserts)
     } else {
       CarbonException.analysisException(
         "Cannot insert into target table because number of columns mismatch")
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 29f8028..8235116 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -394,6 +394,27 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
     assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
   }
 
+  test("test load with multiple inserts") {
+    sql("drop table if exists catalog_returns_5")
+    sql("drop table if exists catalog_returns_6")
+    sql("create table catalog_returns_5(cr_returned_date_sk 
int,cr_returned_time_sk int," +
+        "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES 
TERMINATED BY '\\n'")
+    sql("insert into catalog_returns_5 values(1,2,3)")
+    sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk 
int) partitioned by" +
+        " (cr_returned_date_sk int) stored as carbondata")
+    val df = sql(
+      "from catalog_returns_5 insert overwrite table catalog_returns_6 
partition " +
+      "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk, 
cr_returned_date_sk where " +
+      "cr_returned_date_sk is not null distribute by cr_returned_date_sk 
insert overwrite table " +
+      "catalog_returns_6 partition (cr_returned_date_sk) select 
cr_returned_time_sk, cr_item_sk, " +
+      "cr_returned_date_sk where cr_returned_date_sk is null distribute by 
cr_returned_date_sk")
+    assert(df.collect().size == 2)
+    checkAnswer(df, Seq(Row("0"), Row("1")))
+    checkAnswer(sql("select * from catalog_returns_6"), Seq(Row(2, 3, 1)))
+    sql("drop table if exists catalog_returns_5")
+    sql("drop table if exists catalog_returns_6")
+  }
+
   override def afterEach {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")

Reply via email to