[CARBONDATA-2989] Upgrade spark integration version to 2.3.2 1.According to SPARK-PR#22346, change the parameter type from 'outputColumns: Seq[Attribute]' to 'outputColumnNames: Seq[String]' when call 'writeAndRead' method; 2.According to SPARK-PR#21815, there are some parameters added 'lazy', so move original class 'CarbonDataSourceScan' to src path 'commonTo2.1And2.2', and add a new class 'CarbonDataSourceScan' in src path 'spark2.3' which is added some lazy parameters. 3.Upgrade spark integration version to 2.3.2.
This closes #2779 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2081bc87 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2081bc87 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2081bc87 Branch: refs/heads/branch-1.5 Commit: 2081bc87a5846055c861b28dfc1e3383c53e7ee0 Parents: 1c1ced3 Author: Zhang Zhichao <[email protected]> Authored: Fri Sep 28 01:30:34 2018 +0800 Committer: chenliang613 <[email protected]> Committed: Sun Sep 30 16:59:47 2018 +0800 ---------------------------------------------------------------------- .../testsuite/bigdecimal/TestBigDecimal.scala | 2 +- .../spark/util/CarbonReflectionUtils.scala | 6 +- integration/spark-datasource/pom.xml | 2 +- integration/spark2/pom.xml | 5 +- .../strategy/CarbonDataSourceScan.scala | 53 ++++++++++++++++++ .../strategy/CarbonDataSourceScan.scala | 53 ------------------ .../strategy/CarbonDataSourceScan.scala | 58 ++++++++++++++++++++ pom.xml | 4 +- 8 files changed, 124 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala index 1f7aafe..551b00b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala @@ -45,7 +45,7 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { sql("create table if not exists hiveTable(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(17,2))row format delimited fields terminated by ','") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalDataWithHeader.csv' into table carbonTable") sql(s"LOAD DATA local inpath '$resourcesPath/decimalDataWithoutHeader.csv' INTO table hiveTable") - sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10))row format delimited fields terminated by ','") + sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10))row format delimited fields terminated by ','") sql(s"LOAD DATA local inpath '$resourcesPath/decimalBoundaryDataHive.csv' INTO table hiveBigDecimal") sql("create table if not exists carbonBigDecimal_2 (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'") sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal_2") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 9955286..0055e87 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -294,9 +294,11 @@ object CarbonReflectionUtils { .getMethod("writeAndRead", classOf[SaveMode], classOf[LogicalPlan], - classOf[Seq[Attribute]], + classOf[Seq[String]], classOf[SparkPlan]) - method.invoke(dataSourceObj, mode, query, query.output, physicalPlan) + // since spark 2.3.2 version (SPARK-PR#22346), + // change 'query.output' to 'query.output.map(_.name)' + method.invoke(dataSourceObj, mode, query, query.output.map(_.name), physicalPlan) .asInstanceOf[BaseRelation] } else { throw new UnsupportedOperationException("Spark version not supported") http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark-datasource/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml index 9f0d3ff..002b9f3 100644 --- a/integration/spark-datasource/pom.xml +++ b/integration/spark-datasource/pom.xml @@ -278,7 +278,7 @@ <profile> <id>spark-2.3</id> <properties> - <spark.version>2.3.1</spark.version> + <spark.version>2.3.2</spark.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.8</scala.version> </properties> http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 1eba780..b0d8bbe 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -283,6 +283,7 @@ <configuration> <sources> <source>src/main/spark2.1</source> + <source>src/main/commonTo2.1And2.2</source> </sources> </configuration> </execution> @@ -328,6 +329,7 @@ <sources> <source>src/main/spark2.2</source> <source>src/main/commonTo2.2And2.3</source> + <source>src/main/commonTo2.1And2.2</source> </sources> </configuration> </execution> @@ -339,7 +341,7 @@ <profile> <id>spark-2.3</id> <properties> - <spark.version>2.3.1</spark.version> + <spark.version>2.3.2</spark.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.8</scala.version> </properties> @@ -352,6 +354,7 @@ <excludes> <exclude>src/main/spark2.1</exclude> <exclude>src/main/spark2.2</exclude> + <exclude>src/main/commonTo2.1And2.2</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala new file mode 100644 index 0000000..7605574 --- /dev/null +++ b/integration/spark2/src/main/commonTo2.1And2.2/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.strategy + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +/** + * Physical plan node for scanning data. It is applied for both tables + * USING carbondata and STORED AS CARBONDATA. + */ +class CarbonDataSourceScan( + override val output: Seq[Attribute], + val rdd: RDD[InternalRow], + @transient override val relation: HadoopFsRelation, + val partitioning: Partitioning, + override val metadata: Map[String, String], + identifier: Option[TableIdentifier], + @transient private val logicalRelation: LogicalRelation) + extends FileSourceScanExec( + relation, + output, + relation.dataSchema, + Seq.empty, + Seq.empty, + identifier) { + + override val supportsBatch: Boolean = true + + override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = + (partitioning, Nil) + + override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala deleted file mode 100644 index 7605574..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.strategy - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} - -/** - * Physical plan node for scanning data. It is applied for both tables - * USING carbondata and STORED AS CARBONDATA. - */ -class CarbonDataSourceScan( - override val output: Seq[Attribute], - val rdd: RDD[InternalRow], - @transient override val relation: HadoopFsRelation, - val partitioning: Partitioning, - override val metadata: Map[String, String], - identifier: Option[TableIdentifier], - @transient private val logicalRelation: LogicalRelation) - extends FileSourceScanExec( - relation, - output, - relation.dataSchema, - Seq.empty, - Seq.empty, - identifier) { - - override val supportsBatch: Boolean = true - - override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = - (partitioning, Nil) - - override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala new file mode 100644 index 0000000..5435f04 --- /dev/null +++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/execution/strategy/CarbonDataSourceScan.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.strategy + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +/** + * Physical plan node for scanning data. It is applied for both tables + * USING carbondata and STORED AS CARBONDATA. + */ +class CarbonDataSourceScan( + override val output: Seq[Attribute], + val rdd: RDD[InternalRow], + @transient override val relation: HadoopFsRelation, + val partitioning: Partitioning, + val md: Map[String, String], + identifier: Option[TableIdentifier], + @transient private val logicalRelation: LogicalRelation) + extends FileSourceScanExec( + relation, + output, + relation.dataSchema, + Seq.empty, + Seq.empty, + identifier) { + + // added lazy since spark 2.3.2 version (SPARK-PR#21815) + override lazy val supportsBatch: Boolean = true + + // added lazy since spark 2.3.2 version (SPARK-PR#21815) + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = + (partitioning, Nil) + + // added lazy since spark 2.3.2 version (SPARK-PR#21815) + override lazy val metadata: Map[String, String] = md + + override def inputRDDs(): Seq[RDD[InternalRow]] = rdd :: Nil + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/2081bc87/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 00a5287..7b1d487 100644 --- a/pom.xml +++ b/pom.xml @@ -522,6 +522,7 @@ <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.1</sourceDirectory> + <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory> <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/integration/spark-common/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/integration/spark-common-test/src/main/scala</sourceDirectory> @@ -582,6 +583,7 @@ <sourceDirectory>${basedir}/hadoop/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/integration/spark2/src/main/scala</sourceDirectory> <sourceDirectory>${basedir}/integration/spark2/src/main/spark2.2</sourceDirectory> + <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.1And2.2</sourceDirectory> <sourceDirectory>${basedir}/integration/spark2/src/main/commonTo2.2And2.3</sourceDirectory> <sourceDirectory>${basedir}/integration/spark2/src/main/java</sourceDirectory> <sourceDirectory>${basedir}/integration/spark-common/src/main/scala</sourceDirectory> @@ -608,7 +610,7 @@ <profile> <id>spark-2.3</id> <properties> - <spark.version>2.3.1</spark.version> + <spark.version>2.3.2</spark.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.8</scala.version> </properties>
