Github user zzcclp commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1469#discussion_r152728438
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
 ---
    @@ -143,52 +246,250 @@ case class CarbonIUDAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logica
           selectPlan
         }
         val finalPlan = if (filter.length > 0) {
    -      val alias = table.alias.getOrElse("")
           var transformed: Boolean = false
           // Create a dummy projection to include filter conditions
           var newPlan: LogicalPlan = null
           if (table.tableIdentifier.database.isDefined) {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.database.getOrElse("") + "." +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   
table.tableIdentifier.database.getOrElse("") + "." +
    +                                   table.tableIdentifier.table + " " + 
alias.getOrElse("") + " " +
    +                                   filter)
           }
           else {
             newPlan = parser.parsePlan("select * from  " +
    -           table.tableIdentifier.table + " " + alias + " " + filter)
    +                                   table.tableIdentifier.table + " " + 
alias.getOrElse("") + " " +
    +                                   filter)
           }
           newPlan transform {
    -        case UnresolvedRelation(t, Some(a))
    -          if !transformed && t == table.tableIdentifier && a == alias =>
    +        case CarbonUnresolvedRelation(t)
    +          if !transformed && t == table.tableIdentifier =>
               transformed = true
               // Add the filter condition of update statement  on destination 
table
    -          SubqueryAlias(alias, updatedSelectPlan, 
Option(table.tableIdentifier))
    +          // SubqueryAlias(alias.getOrElse(""), updatedSelectPlan, 
Option(table.tableIdentifier))
    +          if (sparkSession.version.contains("2.1")) {
    +            // SubqueryAlias(alias1, updatedSelectPlan, 
Option(table.tableIdentifier))
    +            val clazz = Utils
    +              
.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +            val ctor = clazz.getConstructors.head
    +            ctor.setAccessible(true)
    +            val subqueryAlias = ctor
    +              .newInstance(alias.getOrElse(""), updatedSelectPlan, 
Option(table.tableIdentifier))
    +              .asInstanceOf[SubqueryAlias]
    +            subqueryAlias
    +          } else if (sparkSession.version.contains("2.2")) {
    +            // 
SubqueryAlias(table.output.map(_.withQualifier(Some(table.tableName))).toString(),
    +            //  Project(projList, relation))
    +            val clazz = Utils
    +              
.classForName("org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias")
    +            val ctor = clazz.getConstructors.head
    +            ctor.setAccessible(true)
    +            val subqueryAlias = ctor.newInstance(alias.getOrElse(""), 
updatedSelectPlan)
    +              .asInstanceOf[SubqueryAlias]
    +            subqueryAlias
    +          } else {
    +            throw new UnsupportedOperationException("Unsupported Spark 
version")
    +          }
           }
         } else {
           updatedSelectPlan
         }
         val tid = 
CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
         val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
    -    val destinationTable = UnresolvedRelation(table.tableIdentifier, 
table.alias)
    +    // TODO use reflection
    +    // val destinationTable = UnresolvedRelation(table.tableIdentifier, 
Some(alias.getOrElse("")))
    +    val destinationTable =
    +      if (sparkSession.version.contains("2.1")) {
    +      val clazz = 
Utils.classForName("org.apache.spark.sql.catalyst.analysis.UnresolvedRelation")
    +      val ctor = clazz.getConstructors.head
    +      ctor.setAccessible(true)
    +      val unresolvedrelation = ctor
    +        .newInstance(table.tableIdentifier,
    +          Some(alias.getOrElse(""))).asInstanceOf[UnresolvedRelation]
    +        unresolvedrelation
    +    } else if (sparkSession.version.contains("2.2")) {
    --- End diff --
    
    use startsWith instead of contains


---

Reply via email to