This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new d8f7df9 [CARBONDATA-4211] Fix - from xx Insert into select fails if
an SQL statement contains multiple inserts
d8f7df9 is described below
commit d8f7df9d7bab1588583d96a7e1f1dab59b31622f
Author: ShreelekhyaG <[email protected]>
AuthorDate: Mon Jun 14 21:06:16 2021 +0530
[CARBONDATA-4211] Fix - from xx Insert into select fails if an SQL
statement contains multiple inserts
Why is this PR needed?
When multiple inserts with single query is used, it fails from SparkPlan
with: java.lang.ClassCastException:
GenericInternalRow cannot be cast to UnsafeRow.
For every successful insert/load we return Segment ID as a row. For
multiple inserts also, we are returning
a row containing Segment ID but while processing in spark
ClassCastException is thrown.
What changes were proposed in this PR?
When multiple insert query is given, it has Union node in the plan. Based
on its presence, made changes
to use flag isMultipleInserts to call class UnionCommandExec and
implemented custom sideEffectResult which
converts GenericInternalRow to UnsafeRow and return.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4151
---
.../apache/spark/sql/CarbonCatalystOperators.scala | 3 +-
.../spark/sql/execution/strategy/DMLStrategy.scala | 45 +++++++++++++++++++---
.../spark/sql/hive/CarbonAnalysisRules.scala | 9 +++--
.../testsuite/dataload/TestLoadDataGeneral.scala | 21 ++++++++++
4 files changed, 68 insertions(+), 10 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index c5fa35d..4813aa5 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -67,7 +67,8 @@ case class InsertIntoCarbonTable (table:
CarbonDatasourceHadoopRelation,
partition: Map[String, Option[String]],
child: LogicalPlan,
overwrite: Boolean,
- ifNotExists: Boolean)
+ ifNotExists: Boolean,
+ containsMultipleInserts: Boolean)
extends Command {
override def output: Seq[Attribute] = {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
index 33c52e5..af1350d 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
@@ -22,15 +22,16 @@ import java.util.Locale
import scala.collection.mutable
import org.apache.log4j.Logger
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{CarbonCountStar, CarbonDatasourceHadoopRelation,
CarbonToSparkAdapter, CountStarPlan, InsertIntoCarbonTable, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending,
AttributeReference, Cast, Descending, Expression, IntegerLiteral, Literal,
NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute,
AttributeReference, Cast, Descending, Expression, IntegerLiteral, Literal,
NamedExpression, ScalaUDF, SortOrder, UnsafeProjection}
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys,
PhysicalOperation}
-import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, Limit,
LogicalPlan, Project, ReturnAnswer, Sort}
-import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec,
FilterExec, PlanLater, ProjectExec, SparkPlan, SparkStrategy}
-import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec, LoadDataCommand}
+import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec,
FilterExec, LeafExecNode, PlanLater, ProjectExec, SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec, LoadDataCommand, RunnableCommand}
import
org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand,
LogicalRelation}
import org.apache.spark.sql.execution.joins.{BroadCastPolygonFilterPushJoin,
BuildLeft, BuildRight}
import org.apache.spark.sql.execution.strategy.CarbonPlanHelper.isCarbonTable
@@ -57,7 +58,16 @@ object DMLStrategy extends SparkStrategy {
case loadData: LoadDataCommand if isCarbonTable(loadData.table) =>
ExecutedCommandExec(DMLHelper.loadData(loadData)) :: Nil
case insert: InsertIntoCarbonTable =>
- ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+ if (insert.containsMultipleInserts) {
+ // Successful insert in carbon will return segment ID in a row.
+ // In-case of this specific multiple inserts scenario the Union node
executes in the
+ // physical plan phase of the command, so the rows should be of
unsafe row object.
+ // So we should override the sideEffectResult to prepare the content
of command's
+ // corresponding rdd from physical plan of insert into command.
+ UnionCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+ } else {
+ ExecutedCommandExec(CarbonPlanHelper.insertInto(insert)) :: Nil
+ }
case insert: InsertIntoHadoopFsRelationCommand
if insert.catalogTable.isDefined &&
isCarbonTable(insert.catalogTable.get.identifier) =>
DataWritingCommandExec(DMLHelper.insertInto(insert),
planLater(insert.query)) :: Nil
@@ -365,3 +375,26 @@ object DMLStrategy extends SparkStrategy {
}
}
+/**
+ * This class will be used when Union node is present in plan with multiple
inserts.
+ * It is a physical operator that executes the run method of a RunnableCommand
and
+ * saves the result to prevent multiple executions.
+ */
+case class UnionCommandExec(cmd: RunnableCommand) extends LeafExecNode {
+
+ protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+ val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+ val internalRow =
cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
+ val unsafeProjection =
UnsafeProjection.create(output.map(_.dataType).toArray)
+ // To make GenericInternalRow to UnsafeRow
+ val row = unsafeProjection(internalRow.head)
+ Seq(row)
+ }
+
+ override def output: Seq[Attribute] = cmd.output
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ sqlContext.sparkContext.parallelize(sideEffectResult, 1)
+ }
+}
+
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 8261415..6fabf09 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -250,13 +250,15 @@ case class CarbonPreInsertionCasts(sparkSession:
SparkSession) extends Rule[Logi
case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- castChildOutput(p, relation, child)
+ // when plan contains Union, it can have multiple insert statements as
its children
+ castChildOutput(p, relation, child, plan.isInstanceOf[Union])
}
}
def castChildOutput(p: InsertIntoTable,
relation: LogicalRelation,
- child: LogicalPlan): LogicalPlan = {
+ child: LogicalPlan,
+ containsMultipleInserts: Boolean): LogicalPlan = {
val carbonDSRelation =
relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
.DEFAULT_MAX_NUMBER_OF_COLUMNS) {
@@ -300,7 +302,8 @@ case class CarbonPreInsertionCasts(sparkSession:
SparkSession) extends Rule[Logi
val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
- InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild,
overwrite, true)
+ InsertIntoCarbonTable(carbonDSRelation, p.partition, newChild,
overwrite, true,
+ containsMultipleInserts)
} else {
CarbonException.analysisException(
"Cannot insert into target table because number of columns mismatch")
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 29f8028..8235116 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -394,6 +394,27 @@ class TestLoadDataGeneral extends QueryTest with
BeforeAndAfterEach {
assert(df.exists(_.get(0).toString.contains("`a`bc`!!d`")))
}
+ test("test load with multiple inserts") {
+ sql("drop table if exists catalog_returns_5")
+ sql("drop table if exists catalog_returns_6")
+ sql("create table catalog_returns_5(cr_returned_date_sk
int,cr_returned_time_sk int," +
+ "cr_item_sk int)ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES
TERMINATED BY '\\n'")
+ sql("insert into catalog_returns_5 values(1,2,3)")
+ sql("create table catalog_returns_6(cr_returned_time_sk int,cr_item_sk
int) partitioned by" +
+ " (cr_returned_date_sk int) stored as carbondata")
+ val df = sql(
+ "from catalog_returns_5 insert overwrite table catalog_returns_6
partition " +
+ "(cr_returned_date_sk) select cr_returned_time_sk, cr_item_sk,
cr_returned_date_sk where " +
+ "cr_returned_date_sk is not null distribute by cr_returned_date_sk
insert overwrite table " +
+ "catalog_returns_6 partition (cr_returned_date_sk) select
cr_returned_time_sk, cr_item_sk, " +
+ "cr_returned_date_sk where cr_returned_date_sk is null distribute by
cr_returned_date_sk")
+ assert(df.collect().size == 2)
+ checkAnswer(df, Seq(Row("0"), Row("1")))
+ checkAnswer(sql("select * from catalog_returns_6"), Seq(Row(2, 3, 1)))
+ sql("drop table if exists catalog_returns_5")
+ sql("drop table if exists catalog_returns_6")
+ }
+
override def afterEach {
sql("DROP TABLE if exists loadtest")
sql("drop table if exists invalidMeasures")