This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8842a9e  [CARBONDATA-3630] update should support limit 1 sub query and 
empty result subquery
8842a9e is described below

commit 8842a9e6c233ad0237dfe23018e9c786c7c8f70b
Author: ajantha-bhat <ajanthab...@gmail.com>
AuthorDate: Mon Dec 23 20:34:01 2019 +0800

    [CARBONDATA-3630] update should support limit 1 sub query and empty result 
subquery
    
    currently update has two flows, update by value and update by join.
    a. update by join should be used only if subquery join present with 
maintable, now in non-join maintable sceneario also join is used. Fixed this.
    b. currently subquery with limit 1 cannot support as it goes to join with 
main table, supported this
    c. If subquery with limit 1 with join with main table is present, current 
design cannot handle it. so throwing exception
    d. Supporting sub query with 0 results to update as null (behavior is same 
as mysql)
    
    This closes #3528
---
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 54 ++++++++++++++-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   | 76 ++++++++++++++++++++--
 2 files changed, 123 insertions(+), 7 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index cae86c0..0f131b1 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.spark.testsuite.iud
 import java.io.File
 
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
-import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
@@ -80,6 +80,58 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("""drop table iud.dest11""").show
   }
 
+  test("update with subquery having limit 1") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("create table t1 (age int, name string) stored by 'carbondata'")
+    sql("insert into t1 select 1, 'aa'")
+    sql("insert into t1 select 3, 'bb'")
+    sql("create table t2 (age int, name string) stored by 'carbondata'")
+    sql("insert into t2 select 3, 'Andy'")
+    sql("insert into t2 select 2, 'Andy'")
+    sql("insert into t2 select 1, 'aa'")
+    sql("insert into t2 select 3, 'aa'")
+    sql("update t1 set (age) = " +
+        "(select t2.age from t2 where t2.name = 'Andy' order by  age limit 1) 
" +
+        "where t1.age = 1 ").show(false)
+    checkAnswer(sql("select * from t1"), Seq(Row(2,"aa"), Row(3,"bb")))
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
+
+  test("update with subquery giving 0 rows") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("create table t1 (age int, name string) stored by 'carbondata'")
+    sql("insert into t1 select 1, 'aa'")
+    sql("create table t2 (age int, name string) stored by 'carbondata'")
+    sql("insert into t2 select 3, 'Andy'")
+    sql("update t1 set (age) = " +
+        "(select t2.age from t2 where t2.age != 3) " +
+        "where t1.age = 1 ").show(false)
+    // should update to null
+    checkAnswer(sql("select * from t1"), Seq(Row(null,"aa")))
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
+
+  test("update with subquery joing with main table and limit") {
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+    sql("create table t1 (age int, name string) stored by 'carbondata'")
+    sql("insert into t1 select 1, 'Andy'")
+    sql("create table t2 (age int, name string) stored by 'carbondata'")
+    sql("insert into t2 select 3, 'Andy'")
+    intercept[AnalysisException] {
+      sql("update t1 set (age) = " +
+          "(select t2.age from t2 where t2.name = t1.name limit 1) " +
+          "where t1.age = 1 ").show(false)
+    }.getMessage.contains("Update subquery has join with maintable " +
+                          "and limit leads to multiple join for each limit for 
each row")
+    sql("drop table if exists t1")
+    sql("drop table if exists t2")
+  }
+
   test("update carbon table[using destination table columns with where and 
exist]") {
     sql("""drop table if exists iud.dest22""")
     sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) 
STORED BY 'org.apache.carbondata.format'""")
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 69145d8..10b661a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,7 +20,8 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.{CarbonToSparkAdapter, DeleteRecords, UpdateTable}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{CarbonToSparkAdapter, Dataset, DeleteRecords, 
ProjectForUpdate, SparkSession, UpdateTable}
 import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -31,7 +32,7 @@ import 
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnC
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
 import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, 
CarbonShowCacheCommand}
 import 
org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, 
CarbonDropStreamCommand, CarbonShowStreamsCommand}
 import org.apache.spark.sql.util.CarbonException
@@ -247,8 +248,67 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
       case tab ~ columns ~ rest =>
         val (sel, where) = splitQuery(rest)
         val selectPattern = """^\s*select\s+""".r
+        // In case of "update = (subquery) where something"
+        // If subquery has join with main table, then only it should go to 
"update by join" flow.
+        // Else it should go to "update by value" flow.
+        // In update by value flow, we need values to update.
+        // so need to execute plan and collect values from subquery if is not 
join with main table.
+        var subQueryResults : String = ""
+        if (selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
+          // subQuery starts with select
+          val mainTableName = tab._4.table
+          val mainTableAlias = if (tab._3.isDefined) {
+            tab._3.get
+          } else {
+            ""
+          }
+          val session = SparkSession.getActiveSession.get
+          val subQueryUnresolvedLogicalPlan = 
session.sessionState.sqlParser.parsePlan(sel)
+          var isJoinWithMainTable : Boolean = false
+          var isLimitPresent : Boolean = false
+          subQueryUnresolvedLogicalPlan collect {
+            case f: Filter =>
+              f.condition.collect {
+                case attr: UnresolvedAttribute =>
+                  if ((!StringUtils.isEmpty(mainTableAlias) &&
+                       attr.nameParts.head.equalsIgnoreCase(mainTableAlias)) ||
+                      attr.nameParts.head.equalsIgnoreCase(mainTableName)) {
+                    isJoinWithMainTable = true
+                  }
+              }
+            case _: GlobalLimit =>
+              isLimitPresent = true
+          }
+          if (isJoinWithMainTable && isLimitPresent) {
+            throw new UnsupportedOperationException(
+              "Update subquery has join with maintable and limit leads to 
multiple join for each " +
+              "limit for each row")
+          }
+          if (!isJoinWithMainTable) {
+            // Should go as value update, not as join update. So execute the 
sub query.
+            val analyzedPlan = 
CarbonReflectionUtils.invokeAnalyzerExecute(session
+              .sessionState
+              .analyzer, subQueryUnresolvedLogicalPlan)
+            val subQueryLogicalPlan = 
session.sessionState.optimizer.execute(analyzedPlan)
+            val df = Dataset.ofRows(session, subQueryLogicalPlan)
+            val rowsCount = df.count()
+            if (rowsCount == 0L) {
+              // if length = 0, update to null
+              subQueryResults = "null"
+            } else if (rowsCount != 1) {
+              throw new UnsupportedOperationException(
+                " update cannot be supported for 1 to N mapping, as more than 
one value present " +
+                "for the update key")
+            } else {
+              subQueryResults = "'" + df.collect()(0).toSeq.mkString("','") + 
"'"
+            }
+          }
+        }
         val (selectStmt, relation) =
-          if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined) {
+          if (!selectPattern.findFirstIn(sel.toLowerCase).isDefined ||
+              !StringUtils.isEmpty(subQueryResults)) {
+            // if subQueryResults are not empty means, it is not join with 
main table.
+            // so use subQueryResults in update with value flow.
             if (sel.trim.isEmpty) {
               sys.error("At least one source column has to be specified ")
             }
@@ -262,12 +322,16 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
                 }
               case _ => tab._1
             }
-
+            val newSel = if (!StringUtils.isEmpty(subQueryResults)) {
+              subQueryResults
+            } else {
+              sel
+            }
             tab._3 match {
               case Some(a) =>
-                ("select " + sel + " from " + getTableName(tab._2) + " " + 
tab._3.get, relation)
+                ("select " + newSel + " from " + getTableName(tab._2) + " " + 
tab._3.get, relation)
               case None =>
-                ("select " + sel + " from " + getTableName(tab._2), relation)
+                ("select " + newSel + " from " + getTableName(tab._2), 
relation)
             }
 
           } else {

Reply via email to