This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 26755cd [CARBONDATA-3516] Fixed compilation issue for mixed formats
in Spark-2.1
26755cd is described below
commit 26755cd616c56b92f74d0a22537eb79a980d85d4
Author: manishnalla1994 <[email protected]>
AuthorDate: Fri Oct 4 19:06:47 2019 +0530
[CARBONDATA-3516] Fixed compilation issue for mixed formats in Spark-2.1
Problem: Compilation issue with Spark-2.1 as FileSourceScanExec takes
different parameters in Spark-2.1 and Spark-2.3.
Solution: Decoupled the code for Spark-2.1 and Spark-2.2/2.3.
This closes #3406
---
.../testsuite/addsegment/AddSegmentTestCase.scala | 49 ++++++++++++----------
.../apache/spark/sql/MixedFomatHandlerUtil.scala | 43 +++++++++++++++++++
.../strategy/CarbonLateDecodeStrategy.scala | 29 ++++++++++++-
.../execution/strategy/MixedFormatHandler.scala | 39 +++++++++++++----
.../apache/spark/sql/MixedFormatHandlerUtil.scala | 44 +++++++++++++++++++
5 files changed, 174 insertions(+), 30 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 8358b1b..e39272a 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -21,6 +21,7 @@ import java.nio.file.{Files, Paths}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -287,11 +288,12 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql(s"""insert into addsegment2 select * from addsegment1""")
sql("select * from addsegment2").show()
- val table =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(
"addsegment2"))
- val path = table.location.getPath
+ val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("addsegment2"))
+ val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
- copy(path, newPath)
+ copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
@@ -347,6 +349,8 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql("select empname from addsegment1 where empname='arvind'"),
Seq(Row("arvind"),Row("arvind"),Row("arvind")))
checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(30)))
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+ assert(sql("select deptname, deptno from addsegment1 where empname =
'arvind'")
+ .collect().length == 3)
FileFactory.deleteAllFilesOfDir(new File(newPath1))
FileFactory.deleteAllFilesOfDir(new File(newPath2))
}
@@ -438,11 +442,12 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql(s"""insert into addsegment2 select * from addsegment1""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
- val table =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(
"addsegment2"))
- val path = table.location.getPath
+ val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("addsegment2"))
+ val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
- copy(path, newPath)
+ copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
@@ -478,12 +483,12 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
""".stripMargin)
sql(s"""insert into addsegment2 select * from addsegment1""")
- val table = sqlContext.sparkSession.sessionState.catalog
+ val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addsegment2"))
- val path = table.location.getPath
+ val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
- copy(path, newPath)
+ copy(path.toString, newPath)
val res1 = sql("select empname, deptname from addsegment1 where deptno=10")
@@ -529,11 +534,12 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql(s"""insert into addsegment2 select * from addsegment1""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
addsegment1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
- val table =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(
"addsegment2"))
- val path = table.location.getPath
+ val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier("addsegment2"))
+ val path = table.location
val newPath = storeLocation + "/" + "addsegtest"
FileFactory.deleteAllFilesOfDir(new File(newPath))
- copy(path, newPath)
+ copy(path.toString, newPath)
checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
sql(s"alter table addsegment1 add segment options('path'='$newPath',
'format'='parquet')").show()
@@ -558,18 +564,18 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
sql("insert into addSegParless values (3)")
sql("insert into addSegParmore values (4,'c', 'x')")
- val table1 = sqlContext.sparkSession.sessionState.catalog
+ val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addSegPar"))
- val table2 = sqlContext.sparkSession.sessionState.catalog
+ val table2 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addSegParless"))
- val table3 = sqlContext.sparkSession.sessionState.catalog
+ val table3 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
.getTableMetadata(TableIdentifier("addSegParmore"))
- sql(s"alter table addSegCar add segment
options('path'='${table1.location.getPath}', 'format'='parquet')")
+ sql(s"alter table addSegCar add segment
options('path'='${table1.location}', 'format'='parquet')")
intercept[Exception] {
- sql(s"alter table addSegCar add segment
options('path'='${table2.location.getPath}', 'format'='parquet')")
+ sql(s"alter table addSegCar add segment
options('path'='${table2.location}', 'format'='parquet')")
}
- sql(s"alter table addSegCar add segment
options('path'='${table3.location.getPath}', 'format'='parquet')")
+ sql(s"alter table addSegCar add segment
options('path'='${table3.location}', 'format'='parquet')")
assert(sql("select * from addSegCar").collect().length == 3)
@@ -580,11 +586,12 @@ class AddSegmentTestCase extends QueryTest with
BeforeAndAfterAll {
}
private def copyseg(tableName: String, pathName: String): String = {
- val table1 =
sqlContext.sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
- val path1 = table1.location.getPath
+ val table1 = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+ .getTableMetadata(TableIdentifier(tableName))
+ val path1 = table1.location
val newPath1 = storeLocation + "/" + pathName
FileFactory.deleteAllFilesOfDir(new File(newPath1))
- copy(path1, newPath1)
+ copy(path1.toString, newPath1)
newPath1
}
diff --git
a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
new file mode 100644
index 0000000..d180cd3
--- /dev/null
+++
b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/MixedFomatHandlerUtil.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.types.StructType
+
+object MixedFormatHandlerUtil {
+
+ def getScanForSegments(
+ @transient relation: HadoopFsRelation,
+ output: Seq[Attribute],
+ outputSchema: StructType,
+ partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression],
+ tableIdentifier: Option[TableIdentifier]
+ ): FileSourceScanExec = {
+ FileSourceScanExec(
+ relation,
+ output,
+ outputSchema,
+ partitionFilters,
+ dataFilters,
+ tableIdentifier)
+ }
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 52411d1..3f6b91d 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -434,15 +434,20 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
// revert for row scan
updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns,
table, needDecoder)
}
+ val newRequestedColumns = if (!vectorPushRowFilters &&
extraRdd.isDefined) {
+ extractUniqueAttributes(projectsAttr, filterSet.toSeq)
+ } else {
+ updateRequestedColumns.asInstanceOf[Seq[Attribute]]
+ }
val scan = getDataSourceScan(relation,
- (updateRequestedColumns.asInstanceOf[Seq[Attribute]], partitions),
+ (newRequestedColumns, partitions),
scanBuilder,
candidatePredicates,
pushedFilters,
handledFilters,
metadata,
needDecoder,
- updateRequestedColumns.asInstanceOf[Seq[Attribute]], extraRdd)
+ newRequestedColumns, extraRdd)
// Check whether spark should handle row filters in case of vector flow.
if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
&& !implicitExisted && !hasDictionaryFilterCols &&
!hasMoreDictionaryCols) {
@@ -473,6 +478,26 @@ private[sql] class CarbonLateDecodeStrategy extends
SparkStrategy {
}
}
+ /*
+ This function is used to get the Unique attributes from filter set
+ and projection set based on their semantics.
+ */
+ private def extractUniqueAttributes(projections: Seq[Attribute],
+ filter: Seq[Attribute]): Seq[Attribute] = {
+ def checkSemanticEquals(filter: Attribute): Option[Attribute] = {
+ projections.find(_.semanticEquals(filter))
+ }
+
+ filter.toList match {
+ case head :: tail =>
+ checkSemanticEquals(head) match {
+ case Some(_) => extractUniqueAttributes(projections, tail)
+ case None => extractUniqueAttributes(projections :+ head, tail)
+ }
+ case Nil => projections
+ }
+ }
+
protected def getRequestedColumns(relation: LogicalRelation,
projectsAttr: Seq[Attribute],
filterSet: AttributeSet,
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index f04985c..a54645d 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -22,13 +22,11 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{execution, MixedFormatHandlerUtil, SparkSession}
import
org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileFormat
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
-import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, AttributeSet, Cast, Expression, ExpressionSet,
NamedExpression, SubqueryExpression}
import org.apache.spark.sql.execution.datasources.{FileFormat,
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -236,8 +234,8 @@ object MixedFormatHandler {
LOGGER.info(s"Post-Scan Filters: ${ afterScanFilters.mkString(",") }")
val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions = new util.LinkedHashSet[NamedExpression](
- (projects.map(p => dataColumns.find(_.exprId == p.exprId).get) ++
- filterAttributes.map(p => dataColumns.find(_.exprId ==
p.exprId).get)).asJava
+ (projects.flatMap(p => findAttribute(dataColumns, p)) ++
+ filterAttributes.map(p =>
dataColumns.find(_.exprId.equals(p.exprId)).get)).asJava
).asScala.toSeq
val readDataColumns =
requiredExpressions.filterNot(partitionColumns.contains).asInstanceOf[Seq[Attribute]]
@@ -247,7 +245,7 @@ object MixedFormatHandler {
val outputAttributes = readDataColumns ++ partitionColumns
val scan =
- FileSourceScanExec(
+ MixedFormatHandlerUtil.getScanForSegments(
fsRelation,
outputAttributes,
outputSchema,
@@ -264,6 +262,33 @@ object MixedFormatHandler {
(withProjections.inputRDDs().head, fileFormat.supportBatch(sparkSession,
outputSchema))
}
+ // This function is used to get the unique columns based on expression Id
from
+ // filters and the projections list
+ def findAttribute(dataColumns: Seq[Attribute], p: Expression):
Seq[Attribute] = {
+ dataColumns.find {
+ x =>
+ val attr = findAttributeReference(p)
+ attr.isDefined && x.exprId.equals(attr.get.exprId)
+ } match {
+ case Some(c) => Seq(c)
+ case None => Seq()
+ }
+ }
+
+ private def findAttributeReference(p: Expression): Option[NamedExpression] =
{
+ p match {
+ case a: AttributeReference =>
+ Some(a)
+ case al =>
+ if (al.children.nonEmpty) {
+ al.children.map(findAttributeReference).head
+ } else {
+ None
+ }
+ case _ => None
+ }
+ }
+
def getSegmentsToAccess(identifier: AbsoluteTableIdentifier): Seq[String] = {
val carbonSessionInfo: CarbonSessionInfo = {
var info = ThreadLocalSessionInfo.getCarbonSessionInfo
diff --git
a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
new file mode 100644
index 0000000..69ed477
--- /dev/null
+++
b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/MixedFormatHandlerUtil.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
HadoopFsRelation}
+import org.apache.spark.sql.types.StructType
+
+object MixedFormatHandlerUtil {
+
+ def getScanForSegments(
+ @transient relation: HadoopFsRelation,
+ output: Seq[Attribute],
+ outputSchema: StructType,
+ partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression],
+ tableIdentifier: Option[TableIdentifier]
+ ): FileSourceScanExec = {
+ val pushedDownFilters =
dataFilters.flatMap(DataSourceStrategy.translateFilter)
+ FileSourceScanExec(
+ relation,
+ output,
+ outputSchema,
+ partitionFilters,
+ pushedDownFilters,
+ tableIdentifier)
+ }
+}