This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 26755cd  [CARBONDATA-3516] Fixed compilation issue for mixed formats 
in Spark-2.1
26755cd is described below

commit 26755cd616c56b92f74d0a22537eb79a980d85d4
Author: manishnalla1994 <[email protected]>
AuthorDate: Fri Oct 4 19:06:47 2019 +0530

    [CARBONDATA-3516] Fixed compilation issue for mixed formats in Spark-2.1
    
    Problem: Compilation issue with Spark-2.1 as FileSourceScanExec takes 
different parameters in Spark-2.1 and Spark-2.3.
    
    Solution: Decoupled the code for Spark-2.1 and Spark-2.2/2.3.
    
    This closes #3406
---
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 49 ++++++++++++----------
 .../apache/spark/sql/MixedFomatHandlerUtil.scala   | 43 +++++++++++++++++++
 .../strategy/CarbonLateDecodeStrategy.scala        | 29 ++++++++++++-
 .../execution/strategy/MixedFormatHandler.scala    | 39 +++++++++++++----
 .../apache/spark/sql/MixedFormatHandlerUtil.scala  | 44 +++++++++++++++++++
 5 files changed, 174 insertions(+), 30 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 8358b1b..e39272a 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -21,6 +21,7 @@ import java.nio.file.{Files, Paths}
 
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
 import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -287,11 +288,12 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"""insert into addsegment2 select * from addsegment1""")
 
     sql("select * from addsegment2").show()
-    val table = 
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier( 
"addsegment2"))
-    val path = table.location.getPath
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("addsegment2"))
+    val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path, newPath)
+    copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
 
     sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='parquet')").show()
@@ -347,6 +349,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select empname from addsegment1 where empname='arvind'"), 
Seq(Row("arvind"),Row("arvind"),Row("arvind")))
     checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(30)))
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+    assert(sql("select deptname, deptno from addsegment1 where empname = 
'arvind'")
+             .collect().length == 3)
     FileFactory.deleteAllFilesOfDir(new File(newPath1))
     FileFactory.deleteAllFilesOfDir(new File(newPath2))
   }
@@ -438,11 +442,12 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"""insert into addsegment2 select * from addsegment1""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-    val table = 
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier( 
"addsegment2"))
-    val path = table.location.getPath
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("addsegment2"))
+    val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path, newPath)
+    copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
 
     sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='parquet')").show()
@@ -478,12 +483,12 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
       """.stripMargin)
     sql(s"""insert into addsegment2 select * from addsegment1""")
 
-    val table = sqlContext.sparkSession.sessionState.catalog
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
       .getTableMetadata(TableIdentifier("addsegment2"))
-    val path = table.location.getPath
+    val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path, newPath)
+    copy(path.toString, newPath)
 
     val res1 = sql("select empname, deptname from addsegment1 where deptno=10")
 
@@ -529,11 +534,12 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql(s"""insert into addsegment2 select * from addsegment1""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-    val table = 
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier( 
"addsegment2"))
-    val path = table.location.getPath
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("addsegment2"))
+    val path = table.location
     val newPath = storeLocation + "/" + "addsegtest"
     FileFactory.deleteAllFilesOfDir(new File(newPath))
-    copy(path, newPath)
+    copy(path.toString, newPath)
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
 
     sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='parquet')").show()
@@ -558,18 +564,18 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     sql("insert into addSegParless values (3)")
     sql("insert into addSegParmore values (4,'c', 'x')")
 
-    val table1 = sqlContext.sparkSession.sessionState.catalog
+    val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
       .getTableMetadata(TableIdentifier("addSegPar"))
-    val table2 = sqlContext.sparkSession.sessionState.catalog
+    val table2 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
       .getTableMetadata(TableIdentifier("addSegParless"))
-    val table3 = sqlContext.sparkSession.sessionState.catalog
+    val table3 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
       .getTableMetadata(TableIdentifier("addSegParmore"))
 
-    sql(s"alter table addSegCar add segment 
options('path'='${table1.location.getPath}', 'format'='parquet')")
+    sql(s"alter table addSegCar add segment 
options('path'='${table1.location}', 'format'='parquet')")
     intercept[Exception] {
-      sql(s"alter table addSegCar add segment 
options('path'='${table2.location.getPath}', 'format'='parquet')")
+      sql(s"alter table addSegCar add segment 
options('path'='${table2.location}', 'format'='parquet')")
     }
-    sql(s"alter table addSegCar add segment 
options('path'='${table3.location.getPath}', 'format'='parquet')")
+    sql(s"alter table addSegCar add segment 
options('path'='${table3.location}', 'format'='parquet')")
 
     assert(sql("select * from addSegCar").collect().length == 3)
 
@@ -580,11 +586,12 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
   }
 
   private def copyseg(tableName: String, pathName: String): String = {
-    val table1 = 
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
-    val path1 = table1.location.getPath
+    val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier(tableName))
+    val path1 = table1.location
     val newPath1 = storeLocation + "/" + pathName
     FileFactory.deleteAllFilesOfDir(new File(newPath1))
-    copy(path1, newPath1)
+    copy(path1.toString, newPath1)
     newPath1
   }
 
diff --git 
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
new file mode 100644
index 0000000..d180cd3
--- /dev/null
+++ 
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+
+object MixedFormatHandlerUtil {
+
+  def getScanForSegments(
+      @transient relation: HadoopFsRelation,
+      output: Seq[Attribute],
+      outputSchema: StructType,
+      partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression],
+      tableIdentifier: Option[TableIdentifier]
+  ): FileSourceScanExec = {
+    FileSourceScanExec(
+      relation,
+      output,
+      outputSchema,
+      partitionFilters,
+      dataFilters,
+      tableIdentifier)
+  }
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 52411d1..3f6b91d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -434,15 +434,20 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
         // revert for row scan
         updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, 
table, needDecoder)
       }
+      val newRequestedColumns = if (!vectorPushRowFilters && 
extraRdd.isDefined) {
+        extractUniqueAttributes(projectsAttr, filterSet.toSeq)
+      } else {
+        updateRequestedColumns.asInstanceOf[Seq[Attribute]]
+      }
       val scan = getDataSourceScan(relation,
-        (updateRequestedColumns.asInstanceOf[Seq[Attribute]], partitions),
+        (newRequestedColumns, partitions),
         scanBuilder,
         candidatePredicates,
         pushedFilters,
         handledFilters,
         metadata,
         needDecoder,
-        updateRequestedColumns.asInstanceOf[Seq[Attribute]], extraRdd)
+        newRequestedColumns, extraRdd)
       // Check whether spark should handle row filters in case of vector flow.
       if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
           && !implicitExisted && !hasDictionaryFilterCols && 
!hasMoreDictionaryCols) {
@@ -473,6 +478,26 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
     }
   }
 
+  /*
+    This function is used to get the Unique attributes from filter set
+    and projection set based on their semantics.
+   */
+  private def extractUniqueAttributes(projections: Seq[Attribute],
+      filter: Seq[Attribute]): Seq[Attribute] = {
+    def checkSemanticEquals(filter: Attribute): Option[Attribute] = {
+      projections.find(_.semanticEquals(filter))
+    }
+
+    filter.toList match {
+      case head :: tail =>
+        checkSemanticEquals(head) match {
+          case Some(_) => extractUniqueAttributes(projections, tail)
+          case None => extractUniqueAttributes(projections :+ head, tail)
+        }
+      case Nil => projections
+    }
+  }
+
   protected def getRequestedColumns(relation: LogicalRelation,
       projectsAttr: Seq[Attribute],
       filterSet: AttributeSet,
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index f04985c..a54645d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -22,13 +22,11 @@ import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{execution, MixedFormatHandlerUtil, SparkSession}
 import 
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
-import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, AttributeSet, Cast, Expression, ExpressionSet, 
NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -236,8 +234,8 @@ object MixedFormatHandler {
     LOGGER.info(s"Post-Scan Filters: ${ afterScanFilters.mkString(",") }")
     val filterAttributes = AttributeSet(afterScanFilters)
     val requiredExpressions = new util.LinkedHashSet[NamedExpression](
-        (projects.map(p => dataColumns.find(_.exprId == p.exprId).get) ++
-         filterAttributes.map(p => dataColumns.find(_.exprId == 
p.exprId).get)).asJava
+      (projects.flatMap(p => findAttribute(dataColumns, p)) ++
+       filterAttributes.map(p => 
dataColumns.find(_.exprId.equals(p.exprId)).get)).asJava
     ).asScala.toSeq
     val readDataColumns =
       
requiredExpressions.filterNot(partitionColumns.contains).asInstanceOf[Seq[Attribute]]
@@ -247,7 +245,7 @@ object MixedFormatHandler {
     val outputAttributes = readDataColumns ++ partitionColumns
 
     val scan =
-      FileSourceScanExec(
+      MixedFormatHandlerUtil.getScanForSegments(
         fsRelation,
         outputAttributes,
         outputSchema,
@@ -264,6 +262,33 @@ object MixedFormatHandler {
     (withProjections.inputRDDs().head, fileFormat.supportBatch(sparkSession, 
outputSchema))
   }
 
+  // This function is used to get the unique columns based on expression Id 
from
+  // filters and the projections list
+  def findAttribute(dataColumns: Seq[Attribute], p: Expression): 
Seq[Attribute] = {
+    dataColumns.find {
+      x =>
+        val attr = findAttributeReference(p)
+        attr.isDefined && x.exprId.equals(attr.get.exprId)
+    } match {
+      case Some(c) => Seq(c)
+      case None => Seq()
+    }
+  }
+
+  private def findAttributeReference(p: Expression): Option[NamedExpression] = 
{
+    p match {
+      case a: AttributeReference =>
+        Some(a)
+      case al =>
+        if (al.children.nonEmpty) {
+          al.children.map(findAttributeReference).head
+        } else {
+          None
+        }
+      case _ => None
+    }
+  }
+
   def getSegmentsToAccess(identifier: AbsoluteTableIdentifier): Seq[String] = {
     val carbonSessionInfo: CarbonSessionInfo = {
       var info = ThreadLocalSessionInfo.getCarbonSessionInfo
diff --git 
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
new file mode 100644
index 0000000..69ed477
--- /dev/null
+++ 
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
HadoopFsRelation}
+import org.apache.spark.sql.types.StructType
+
+object MixedFormatHandlerUtil {
+
+  def getScanForSegments(
+      @transient relation: HadoopFsRelation,
+      output: Seq[Attribute],
+      outputSchema: StructType,
+      partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression],
+      tableIdentifier: Option[TableIdentifier]
+  ): FileSourceScanExec = {
+    val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
+    FileSourceScanExec(
+      relation,
+      output,
+      outputSchema,
+      partitionFilters,
+      pushedDownFilters,
+      tableIdentifier)
+  }
+}

Reply via email to