This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 252c789 [CARBONDATA-3491] Return updated/deleted rows count when
execute update/delete sql
252c789 is described below
commit 252c789b362a7920663ce177ed7c5c6f27a1e08e
Author: Zhang Zhichao <[email protected]>
AuthorDate: Tue Aug 13 11:00:23 2019 +0800
[CARBONDATA-3491] Return updated/deleted rows count when execute
update/delete sql
Return updated/deleted rows count when execute update/delete sql
This closes #3357
---
.../testsuite/iud/DeleteCarbonTableTestCase.scala | 19 +++++++++++++
.../testsuite/iud/UpdateCarbonTableTestCase.scala | 33 ++++++++++++++++++++++
.../scala/org/apache/carbondata/spark/KeyVal.scala | 10 +++----
.../apache/spark/util/CarbonReflectionUtils.scala | 16 +++++++++++
.../apache/spark/sql/CarbonCatalystOperators.scala | 6 ++--
.../mutation/CarbonProjectForDeleteCommand.scala | 21 ++++++++++----
.../mutation/CarbonProjectForUpdateCommand.scala | 19 ++++++++-----
.../command/mutation/DeleteExecution.scala | 27 ++++++++++--------
.../spark/sql/hive/CarbonAnalysisRules.scala | 12 +++++++-
9 files changed, 129 insertions(+), 34 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index f26283b..4565d7a 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -361,6 +361,25 @@ class DeleteCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("drop table if exists decimal_table")
}
+ test("[CARBONDATA-3491] Return updated/deleted rows count when execute
update/delete sql") {
+ sql("drop table if exists test_return_row_count")
+
+ sql("create table test_return_row_count (a string, b string, c string)
stored by 'carbondata'").show()
+ sql("insert into test_return_row_count select 'aaa','bbb','ccc'").show()
+ sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show()
+ sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+ sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+
+ checkAnswer(sql("delete from test_return_row_count where a = 'aaa'"),
+ Seq(Row(1))
+ )
+ checkAnswer(sql("select * from test_return_row_count"),
+ Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "bbb", "ccc"), Row("ccc",
"bbb", "ccc"))
+ )
+
+ sql("drop table if exists test_return_row_count").show()
+ }
+
override def afterAll {
sql("use default")
sql("drop database if exists iud_db cascade")
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index cf45600..ef18035 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -826,6 +826,39 @@ class UpdateCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
sql("""drop table iud.dest11""").show
}
+ test("[CARBONDATA-3491] Return updated/deleted rows count when execute
update/delete sql") {
+ sql("drop table if exists test_return_row_count")
+ sql("drop table if exists test_return_row_count_source")
+
+ sql("create table test_return_row_count (a string, b string, c string)
stored by 'carbondata'").show()
+ sql("insert into test_return_row_count select 'bbb','bbb','ccc'").show()
+ sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+ sql("insert into test_return_row_count select 'ccc','bbb','ccc'").show()
+
+ sql("create table test_return_row_count_source (a string, b string, c
string) stored by 'carbondata'").show()
+ sql("insert into test_return_row_count_source select
'aaa','eee','ccc'").show()
+ sql("insert into test_return_row_count_source select
'bbb','bbb','ccc'").show()
+ sql("insert into test_return_row_count_source select
'ccc','bbb','ccc'").show()
+ sql("insert into test_return_row_count_source select
'ccc','bbb','ccc'").show()
+
+ checkAnswer(sql("update test_return_row_count set (b) = ('ddd') where a =
'ccc'"),
+ Seq(Row(2))
+ )
+ checkAnswer(sql("select * from test_return_row_count"),
+ Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "ddd", "ccc"), Row("ccc",
"ddd", "ccc"))
+ )
+
+ checkAnswer(sql("update test_return_row_count t set (t.b) = (select s.b
from test_return_row_count_source s where s.a = 'aaa') where t.a = 'ccc'"),
+ Seq(Row(2))
+ )
+ checkAnswer(sql("select * from test_return_row_count"),
+ Seq(Row("bbb", "bbb", "ccc"), Row("ccc", "eee", "ccc"), Row("ccc",
"eee", "ccc"))
+ )
+
+ sql("drop table if exists test_return_row_count")
+ sql("drop table if exists test_return_row_count_source")
+ }
+
override def afterAll {
sql("use default")
sql("drop database if exists iud cascade")
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index c4b1144..9fca245 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -76,22 +76,20 @@ class updateResultImpl
}
trait DeleteDelataResult[K, V] extends Serializable {
- def getKey(key: SegmentStatus, value: (SegmentUpdateDetails,
ExecutionErrors)): (K, V)
+ def getKey(key: SegmentStatus, value: (SegmentUpdateDetails,
ExecutionErrors, Long)): (K, V)
}
class DeleteDelataResultImpl
- extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors)] {
+ extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors, Long)] {
override def getKey(key: SegmentStatus,
- value: (SegmentUpdateDetails, ExecutionErrors)): (SegmentStatus,
(SegmentUpdateDetails,
- ExecutionErrors)) = {
+ value: (SegmentUpdateDetails, ExecutionErrors, Long)): (SegmentStatus,
(SegmentUpdateDetails,
+ ExecutionErrors, Long)) = {
(key, value)
}
}
-
trait PartitionResult[K, V] extends Serializable {
def getKey(key: Int, value: Boolean): (K, V)
-
}
class PartitionResultImpl extends PartitionResult[Int, Boolean] {
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 4fc30d02..46692df 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
@@ -381,4 +382,19 @@ object CarbonReflectionUtils {
nameField.setAccessible(true)
nameField.set(caseObj, objToSet)
}
+
+ def invokeAnalyzerExecute(analyzer: Analyzer,
+ plan: LogicalPlan): LogicalPlan = {
+ if (SparkUtil.isSparkVersionEqualTo("2.1") ||
SparkUtil.isSparkVersionEqualTo("2.2")) {
+ val method: Method = analyzer.getClass
+ .getMethod("execute", classOf[LogicalPlan])
+ method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+ } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+ val method: Method = analyzer.getClass
+ .getMethod("executeAndCheck", classOf[LogicalPlan])
+ method.invoke(analyzer, plan).asInstanceOf[LogicalPlan]
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
+ }
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 160c785..5f745d2 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -68,8 +68,8 @@ case class ExcludeProfile(attributes: Seq[Attribute]) extends
CarbonProfile(attr
case class ProjectForUpdate(
table: UnresolvedRelation,
columns: List[String],
- children: Seq[LogicalPlan] ) extends LogicalPlan {
- override def output: Seq[AttributeReference] = Seq.empty
+ children: Seq[LogicalPlan]) extends LogicalPlan {
+ override def output: Seq[Attribute] = Seq.empty
}
case class UpdateTable(
@@ -79,7 +79,7 @@ case class UpdateTable(
alias: Option[String] = None,
filer: String) extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq.empty
- override def output: Seq[AttributeReference] = Seq.empty
+ override def output: Seq[Attribute] = Seq.empty
}
case class DeleteRecords(
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index ae1d848..45c73ac 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.execution.command.mutation
import scala.collection.JavaConverters._
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.types.LongType
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -48,6 +50,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
timestamp: String)
extends DataCommand {
+ override val output: Seq[Attribute] = {
+ Seq(AttributeReference("Deleted Row Count", LongType, nullable = false)())
+ }
+
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp,
tableName)(sparkSession)
@@ -104,7 +110,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
- val deletedSegments = DeleteExecution.deleteDeltaExecution(
+ val (deletedSegments, deletedRowCount) =
DeleteExecution.deleteDeltaExecution(
databaseNameOp,
tableName,
sparkSession,
@@ -112,16 +118,18 @@ private[sql] case class CarbonProjectForDeleteCommand(
timestamp,
isUpdateOperation = false,
executorErrors)
+
+ // Check for any failures occured during delete delta execution
+ if (executorErrors.failureCauses != FailureCauses.NONE) {
+ throw new Exception(executorErrors.errorMsg)
+ }
+
// call IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
isUpdateOperation = false)
DeleteExecution.clearDistributedSegmentCache(carbonTable,
deletedSegments)(sparkSession)
- if (executorErrors.failureCauses != FailureCauses.NONE) {
- throw new Exception(executorErrors.errorMsg)
- }
-
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
@@ -134,11 +142,13 @@ private[sql] case class CarbonProjectForDeleteCommand(
val deleteFromTablePostEvent: DeleteFromTablePostEvent =
DeleteFromTablePostEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent,
operationContext)
+ Seq(Row(deletedRowCount))
} catch {
case e: HorizontalCompactionException =>
LOGGER.error("Delete operation passed. Exception in Horizontal
Compaction." +
" Please check logs. " + e.getMessage)
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable,
e.compactionTimeStamp.toString)
+ Seq(Row(0L))
case e: Exception =>
LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
@@ -160,7 +170,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
updateLock.unlock()
compactionLock.unlock()
}
- Seq.empty
}
override protected def opName: String = "DELETE DATA"
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index b620e38..686990d 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -20,11 +20,12 @@ package org.apache.spark.sql.execution.command.mutation
import scala.collection.JavaConverters._
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.ArrayType
+import org.apache.spark.sql.types.{ArrayType, LongType}
import org.apache.spark.storage.StorageLevel
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -50,8 +51,13 @@ private[sql] case class CarbonProjectForUpdateCommand(
columns: List[String])
extends DataCommand {
+ override val output: Seq[Attribute] = {
+ Seq(AttributeReference("Updated Row Count", LongType, nullable = false)())
+ }
+
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ var updatedRowCount = 0L
IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
val res = plan find {
case relation: LogicalRelation if relation.relation
@@ -61,7 +67,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
}
if (res.isEmpty) {
- return Seq.empty
+ return Array(Row(updatedRowCount)).toSeq
}
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp,
tableName)(sparkSession)
if (carbonTable.getPartitionInfo != null &&
@@ -71,6 +77,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
throw new UnsupportedOperationException("Unsupported update operation
for range/" +
"hash/list partition table")
}
+
setAuditTable(carbonTable)
setAuditInfo(Map("plan" -> plan.simpleString))
columns.foreach { col =>
@@ -135,7 +142,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
// do delete operation.
- val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
+ val (segmentsToBeDeleted, updatedRowCountTemp) =
DeleteExecution.deleteDeltaExecution(
databaseNameOp,
tableName,
sparkSession,
@@ -148,6 +155,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
throw new Exception(executionErrors.errorMsg)
}
+ updatedRowCount = updatedRowCountTemp
// do update operation.
performUpdate(dataSet,
databaseNameOp,
@@ -217,7 +225,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
}
}
- Seq.empty
+ Seq(Row(updatedRowCount))
}
private def performUpdate(
@@ -304,9 +312,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
executorErrors.failureCauses =
updateTableModel.executorErrors.failureCauses
-
- Seq.empty
-
}
override protected def opName: String = "UPDATE DATA"
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index f9428a2..cb86cb5 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -64,14 +64,15 @@ object DeleteExecution {
dataRdd: RDD[Row],
timestamp: String,
isUpdateOperation: Boolean,
- executorErrors: ExecutionErrors): Seq[Segment] = {
+ executorErrors: ExecutionErrors): (Seq[Segment], Long) = {
- var res: Array[List[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors))]] = null
+ var res: Array[List[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors, Long))]] = null
val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp,
tableName)(sparkSession)
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val tablePath = absoluteTableIdentifier.getTablePath
var segmentsTobeDeleted = Seq.empty[Segment]
+ var operatedRowCount = 0L
val deleteRdd = if (isUpdateOperation) {
val schema =
@@ -97,7 +98,7 @@ object DeleteExecution {
// if no loads are present then no need to do anything.
if (keyRdd.partitions.length == 0) {
- return segmentsTobeDeleted
+ return (segmentsTobeDeleted, operatedRowCount)
}
val blockMappingVO =
carbonInputFormat.getBlockRowCount(
@@ -124,9 +125,9 @@ object DeleteExecution {
val rdd = rowContRdd.join(keyRdd)
res = rdd.mapPartitionsWithIndex(
(index: Int, records: Iterator[((String), (RowCountDetailsVO,
Iterable[Row]))]) =>
- Iterator[List[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors))]] {
+ Iterator[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))]] {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
- var result = List[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors))]()
+ var result = List[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors, Long))]()
while (records.hasNext) {
val ((key), (rowCountDetailsVO, groupedRows)) = records.next
val segmentId = key.substring(0,
key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
@@ -143,7 +144,7 @@ object DeleteExecution {
// if no loads are present then no need to do anything.
if (res.flatten.isEmpty) {
- return segmentsTobeDeleted
+ return (segmentsTobeDeleted, operatedRowCount)
}
// update new status file
@@ -217,7 +218,7 @@ object DeleteExecution {
timestamp: String,
rowCountDetailsVO: RowCountDetailsVO,
isStandardTable: Boolean
- ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = {
+ ): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))] = {
val result = new DeleteDelataResultImpl()
var deleteStatus = SegmentStatus.LOAD_FAILURE
@@ -228,7 +229,8 @@ object DeleteExecution {
CarbonTablePath.addDataPartPrefix(key.split(CarbonCommonConstants.FILE_SEPARATOR)(1)))
val segmentId = key.split(CarbonCommonConstants.FILE_SEPARATOR)(0)
val deleteDeltaBlockDetails: DeleteDeltaBlockDetails = new
DeleteDeltaBlockDetails(blockName)
- val resultIter = new Iterator[(SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors))] {
+ val resultIter =
+ new Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors,
Long))] {
val segmentUpdateDetails = new SegmentUpdateDetails()
var TID = ""
var countOfRows = 0
@@ -305,15 +307,18 @@ object DeleteExecution {
}
}
- override def next(): (SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors)) = {
+ override def next(): (SegmentStatus, (SegmentUpdateDetails,
ExecutionErrors, Long)) = {
finished = true
- result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors))
+ result.getKey(deleteStatus, (segmentUpdateDetails, executorErrors,
countOfRows.toLong))
}
}
resultIter
}
- segmentsTobeDeleted
+ if (executorErrors.failureCauses == FailureCauses.NONE) {
+ operatedRowCount = res.flatten.map(_._2._3).sum
+ }
+ (segmentsTobeDeleted, operatedRowCount)
}
def clearDistributedSegmentCache(carbonTable: CarbonTable,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 9ba2301..9b923b0 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -40,6 +40,8 @@ import org.apache.carbondata.core.util.CarbonUtil
case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
private lazy val parser = sparkSession.sessionState.sqlParser
+ private lazy val optimizer = sparkSession.sessionState.optimizer
+ private lazy val analyzer = sparkSession.sessionState.analyzer
private def processUpdateQuery(
table: UnresolvedRelation,
@@ -181,7 +183,15 @@ case class CarbonIUDAnalysisRule(sparkSession:
SparkSession) extends Rule[Logica
}
val destinationTable =
CarbonReflectionUtils.getUnresolvedRelation(table.tableIdentifier, alias)
- ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
+ // In Spark 2.1 and 2.2, it uses Analyzer.execute method to transform
LogicalPlan
+ // but in Spark 2.3, it uses Analyzer.executeAndCheck method
+ val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute(
+ analyzer, ProjectForUpdate(destinationTable, columns, Seq(finalPlan)))
+ // For all commands, they execute eagerly, and will be transformed to
+ // logical plan 'LocalRelation' in analyze phase(please see the code in
'Dataset.logicalPlan'),
+ // so it needs to return logical plan 'CarbonProjectForUpdateCommand' here
+ // instead of 'ProjectForUpdate'
+ optimizer.execute(analyzedPlan)
}