Repository: spark Updated Branches: refs/heads/branch-1.4 10007fbe0 -> 90f304b0c
http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 10d0ede..3bbc5b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest { }.flatten.reduceOption(_ && _) val forParquetDataSource = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters + case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters }.flatten.reduceOption(_ && _) forParquetTableScan.orElse(forParquetDataSource) @@ -350,7 +350,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before override protected def afterAll(): Unit = { sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString) } - + test("SPARK-6742: don't push down predicates which reference partition columns") { import sqlContext.implicits._ @@ -365,7 +365,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext, Seq(AttributeReference("part", IntegerType, false)()) )) - + checkAnswer( df.filter("a = 1 or part = 1"), (1 to 3).map(i => Row(1, i, i.toString))) http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index b504842..7c371db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -119,7 +119,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } // Decimals with precision above 18 are not yet supported - intercept[RuntimeException] { + intercept[Throwable] { withTempPath { dir => makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() @@ -127,7 +127,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { } // Unlimited-length decimals are not yet supported - intercept[RuntimeException] { + intercept[Throwable] { withTempPath { dir => makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath) parquetFile(dir.getCanonicalPath).collect() @@ -419,7 +419,7 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA test("SPARK-6330 regression test") { // In 1.3.0, save to fs other than file: without configuring core-site.xml would get: // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:/// - intercept[java.io.FileNotFoundException] { + intercept[Throwable] { sqlContext.parquetFile("file:///nonexistent") } val errorMessage = intercept[Throwable] { http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index bea568e..138e197 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -39,7 +39,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { import sqlContext._ import sqlContext.implicits._ - val defaultPartitionName = "__NULL__" + val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" test("column type inference") { def check(raw: String, literal: Literal): Unit = { @@ -252,9 +252,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { val parquetRelation = load( "org.apache.spark.sql.parquet", - Map( - "path" -> base.getCanonicalPath, - ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + Map("path" -> base.getCanonicalPath)) parquetRelation.registerTempTable("t") @@ -297,9 +295,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { val parquetRelation = load( "org.apache.spark.sql.parquet", - Map( - "path" -> base.getCanonicalPath, - ParquetRelation2.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + Map("path" -> base.getCanonicalPath)) parquetRelation.registerTempTable("t") http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index c964b6d..fc90e3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("lowerCase", StringType), StructField("UPPERCase", DoubleType, nullable = false)))) { - ParquetRelation2.mergeMetastoreParquetSchema( + FSBasedParquetRelation.mergeMetastoreParquetSchema( StructType(Seq( StructField("lowercase", StringType), StructField("uppercase", DoubleType, nullable = false))), @@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructType(Seq( StructField("UPPERCase", DoubleType, nullable = false)))) { - ParquetRelation2.mergeMetastoreParquetSchema( + FSBasedParquetRelation.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false))), @@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Metastore schema contains additional non-nullable fields. assert(intercept[Throwable] { - ParquetRelation2.mergeMetastoreParquetSchema( + FSBasedParquetRelation.mergeMetastoreParquetSchema( StructType(Seq( StructField("uppercase", DoubleType, nullable = false), StructField("lowerCase", BinaryType, nullable = false))), @@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Conflicting non-nullable field names intercept[Throwable] { - ParquetRelation2.mergeMetastoreParquetSchema( + FSBasedParquetRelation.mergeMetastoreParquetSchema( StructType(Seq(StructField("lower", StringType, nullable = false))), StructType(Seq(StructField("lowerCase", BinaryType)))) } @@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { StructField("firstField", StringType, nullable = true), StructField("secondField", StringType, nullable = true), StructField("thirdfield", StringType, nullable = true)))) { - ParquetRelation2.mergeMetastoreParquetSchema( + FSBasedParquetRelation.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), @@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest { // Merge should fail if the Metastore contains any additional fields that are not // nullable. assert(intercept[Throwable] { - ParquetRelation2.mergeMetastoreParquetSchema( + FSBasedParquetRelation.mergeMetastoreParquetSchema( StructType(Seq( StructField("firstfield", StringType, nullable = true), StructField("secondfield", StringType, nullable = true), http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d754c8e..b0e82c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,10 +33,10 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.hive.client._ -import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.parquet.FSBasedParquetRelation import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} +import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources} import org.apache.spark.util.Utils /* Implicit conversions */ @@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive // serialize the Metastore schema to JSON and pass it as a data source option because of the // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. val parquetOptions = Map( - ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json, - ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString) + FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json, + FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) @@ -238,13 +238,15 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical@LogicalRelation(parquetRelation: ParquetRelation2) => + case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = parquetRelation.paths.toSet == pathsInMetastore.toSet && logical.schema.sameType(metastoreSchema) && - parquetRelation.maybePartitionSpec == partitionSpecInMetastore + parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse { + PartitionSpec(StructType(Nil), Array.empty[sources.Partition]) + } if (useCached) { Some(logical) @@ -256,7 +258,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive case other => logWarning( s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as Parquet. However, we are getting a ${other} from the metastore cache. " + + s"as Parquet. However, we are getting a $other from the metastore cache. " + s"This cached entry will be invalidated.") cachedDataSourceTables.invalidate(tableIdentifier) None @@ -278,8 +280,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec)) val parquetRelation = cached.getOrElse { - val created = - LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive)) + val created = LogicalRelation( + new FSBasedParquetRelation( + paths.toArray, None, Some(partitionSpec), parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created } @@ -290,8 +293,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val cached = getCached(tableIdentifier, paths, metastoreSchema, None) val parquetRelation = cached.getOrElse { - val created = - LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive)) + val created = LogicalRelation( + new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive)) cachedDataSourceTables.put(tableIdentifier, created) created } http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 47c60f6..da5d203 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -21,21 +21,18 @@ import java.io.File import scala.collection.mutable.ArrayBuffer -import org.scalatest.BeforeAndAfterEach - import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.mapred.InvalidInputException +import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql._ -import org.apache.spark.util.Utils -import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.parquet.FSBasedParquetRelation import org.apache.spark.sql.sources.LogicalRelation +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * Tests for persisting tables created though the data sources API into the metastore. @@ -582,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: ParquetRelation2) => // OK + case LogicalRelation(p: FSBasedParquetRelation) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[ParquetRelation2].getCanonicalName}") + s"${classOf[FSBasedParquetRelation].getCanonicalName}") } // Clenup and reset confs. http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index a5744cc..1d6393a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException -import org.apache.spark.sql.DefaultParserDialect -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} -import org.apache.spark.sql.hive.MetastoreRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim} -import org.apache.spark.sql.parquet.ParquetRelation2 +import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation} +import org.apache.spark.sql.parquet.FSBasedParquetRelation import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ +import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf} case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -176,17 +174,17 @@ class SQLQuerySuite extends QueryTest { def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = { val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) relation match { - case LogicalRelation(r: ParquetRelation2) => + case LogicalRelation(r: FSBasedParquetRelation) => if (!isDataSourceParquet) { fail( s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " + - s"${ParquetRelation2.getClass.getCanonicalName}.") + s"${FSBasedParquetRelation.getClass.getCanonicalName}.") } case r: MetastoreRelation => if (isDataSourceParquet) { fail( - s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " + + s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " + s"${classOf[MetastoreRelation].getCanonicalName}.") } } @@ -596,7 +594,7 @@ class SQLQuerySuite extends QueryTest { sql(s"DROP TABLE $tableName") } } - + test("SPARK-5203 union with different decimal precision") { Seq.empty[(Decimal, Decimal)] .toDF("d1", "d2") http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index bf1121d..41bcbe8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,16 +21,15 @@ import java.io.File import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.{QueryTest, SQLConf} import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation} -import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} -import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan} +import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode} import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. @@ -292,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: ParquetRelation2) => // OK - case _ => - fail( - s"test_parquet_ctas should be converted to ${classOf[ParquetRelation2].getCanonicalName}") + case LogicalRelation(_: FSBasedParquetRelation) => // OK + case _ => fail( + "test_parquet_ctas should be converted to " + + s"${classOf[FSBasedParquetRelation].getCanonicalName}") } sql("DROP TABLE IF EXISTS test_parquet_ctas") @@ -316,12 +315,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.executedPlan match { - case ExecutedCommand( - InsertIntoDataSource( - LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[ParquetRelation2].getCanonicalName} and " + - s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + + s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + s"However, found a ${o.toString} ") } @@ -348,11 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.executedPlan match { - case ExecutedCommand( - InsertIntoDataSource( - LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK + case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + - s"${classOf[ParquetRelation2].getCanonicalName} and " + + s"${classOf[FSBasedParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + s"However, found a ${o.toString} ") } @@ -383,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { assertResult(2) { analyzed.collect { - case r @ LogicalRelation(_: ParquetRelation2) => r + case r @ LogicalRelation(_: FSBasedParquetRelation) => r }.size } @@ -395,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { // Converted test_parquet should be cached. catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match { case null => fail("Converted test_parquet should be cached in the cache.") - case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK + case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " + @@ -693,7 +688,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") - intercept[RuntimeException](df2.saveAsParquetFile(filePath)) + intercept[Throwable](df2.saveAsParquetFile(filePath)) val df3 = df2.toDF("str", "max_int") df3.saveAsParquetFile(filePath2) http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala deleted file mode 100644 index e8b48a0..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import org.apache.hadoop.fs.Path - -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql._ -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.parquet.ParquetTest -import org.apache.spark.sql.types._ - -// TODO Don't extend ParquetTest -// This test suite extends ParquetTest for some convenient utility methods. These methods should be -// moved to some more general places, maybe QueryTest. -class FSBasedRelationSuite extends QueryTest with ParquetTest { - override val sqlContext: SQLContext = TestHive - - import sqlContext._ - import sqlContext.implicits._ - - val dataSchema = - StructType( - Seq( - StructField("a", IntegerType, nullable = false), - StructField("b", StringType, nullable = false))) - - val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") - - val partitionedTestDF1 = (for { - i <- 1 to 3 - p2 <- Seq("foo", "bar") - } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") - - val partitionedTestDF2 = (for { - i <- 1 to 3 - p2 <- Seq("foo", "bar") - } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") - - val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2) - - def checkQueries(df: DataFrame): Unit = { - // Selects everything - checkAnswer( - df, - for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) - - // Simple filtering and partition pruning - checkAnswer( - df.filter('a > 1 && 'p1 === 2), - for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) - - // Simple projection and filtering - checkAnswer( - df.filter('a > 1).select('b, 'a + 1), - for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) - - // Simple projection and partition pruning - checkAnswer( - df.filter('a > 1 && 'p1 < 2).select('b, 'p1), - for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) - - // Self-join - df.registerTempTable("t") - withTempTable("t") { - checkAnswer( - sql( - """SELECT l.a, r.b, l.p1, r.p2 - |FROM t l JOIN t r - |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 - """.stripMargin), - for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) - } - } - - test("save()/load() - non-partitioned table - Overwrite") { - withTempPath { file => - testDF.save( - path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite) - - testDF.save( - path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite) - - checkAnswer( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - testDF.collect()) - } - } - - test("save()/load() - non-partitioned table - Append") { - withTempPath { file => - testDF.save( - path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite) - - testDF.save( - path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append) - - checkAnswer( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)).orderBy("a"), - testDF.unionAll(testDF).orderBy("a").collect()) - } - } - - test("save()/load() - non-partitioned table - ErrorIfExists") { - withTempDir { file => - intercept[RuntimeException] { - testDF.save( - path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.ErrorIfExists) - } - } - } - - test("save()/load() - non-partitioned table - Ignore") { - withTempDir { file => - testDF.save( - path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Ignore) - - val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - assert(fs.listStatus(path).isEmpty) - } - } - - test("save()/load() - partitioned table - simple queries") { - withTempPath { file => - partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.ErrorIfExists, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkQueries( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json))) - } - } - - test("save()/load() - partitioned table - simple queries - partition columns in data") { - withTempDir { file => - val basePath = new Path(file.getCanonicalPath) - val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) - val qualifiedBasePath = fs.makeQualified(basePath) - - for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { - val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") - sparkContext - .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") - .saveAsTextFile(partitionDir.toString) - } - - val dataSchemaWithPartition = - StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) - - checkQueries( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchemaWithPartition.json))) - } - } - - test("save()/load() - partitioned table - Overwrite") { - withTempPath { file => - partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkAnswer( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - partitionedTestDF.collect()) - } - } - - test("save()/load() - partitioned table - Append") { - withTempPath { file => - partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkAnswer( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - partitionedTestDF.unionAll(partitionedTestDF).collect()) - } - } - - test("save()/load() - partitioned table - Append - new partition values") { - withTempPath { file => - partitionedTestDF1.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF2.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkAnswer( - load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - partitionedTestDF.collect()) - } - } - - test("save()/load() - partitioned table - ErrorIfExists") { - withTempDir { file => - intercept[RuntimeException] { - partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.ErrorIfExists, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - } - } - } - - test("save()/load() - partitioned table - Ignore") { - withTempDir { file => - partitionedTestDF.save( - path = file.getCanonicalPath, - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Ignore) - - val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(SparkHadoopUtil.get.conf) - assert(fs.listStatus(path).isEmpty) - } - } - - def withTable(tableName: String)(f: => Unit): Unit = { - try f finally sql(s"DROP TABLE $tableName") - } - - test("saveAsTable()/load() - non-partitioned table - Overwrite") { - testDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - Map("dataSchema" -> dataSchema.json)) - - withTable("t") { - checkAnswer(table("t"), testDF.collect()) - } - } - - test("saveAsTable()/load() - non-partitioned table - Append") { - testDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite) - - testDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append) - - withTable("t") { - checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect()) - } - } - - test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - intercept[AnalysisException] { - testDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.ErrorIfExists) - } - } - } - - test("saveAsTable()/load() - non-partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - testDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Ignore) - - assert(table("t").collect().isEmpty) - } - } - - test("saveAsTable()/load() - partitioned table - simple queries") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - Map("dataSchema" -> dataSchema.json)) - - withTable("t") { - checkQueries(table("t")) - } - } - - test("saveAsTable()/load() - partitioned table - Overwrite") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - withTable("t") { - checkAnswer(table("t"), partitionedTestDF.collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - withTable("t") { - checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append - new partition values") { - partitionedTestDF1.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF2.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - withTable("t") { - checkAnswer(table("t"), partitionedTestDF.collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { - partitionedTestDF1.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - // Using only a subset of all partition columns - intercept[Throwable] { - partitionedTestDF2.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1")) - } - - // Using different order of partition columns - intercept[Throwable] { - partitionedTestDF2.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p2", "p1")) - } - } - - test("saveAsTable()/load() - partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - intercept[AnalysisException] { - partitionedTestDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.ErrorIfExists, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - } - } - } - - test("saveAsTable()/load() - partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Ignore, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - assert(table("t").collect().isEmpty) - } - } - - test("Hadoop style globbing") { - withTempPath { file => - partitionedTestDF.save( - source = classOf[SimpleTextSource].getCanonicalName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - val df = load( - source = classOf[SimpleTextSource].getCanonicalName, - options = Map( - "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", - "dataSchema" -> dataSchema.json)) - - val expectedPaths = Set( - s"${file.getCanonicalFile}/p1=1/p2=foo", - s"${file.getCanonicalFile}/p1=2/p2=foo", - s"${file.getCanonicalFile}/p1=1/p2=bar", - s"${file.getCanonicalFile}/p1=2/p2=bar" - ).map { p => - val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString - } - - val actualPaths = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: FSBasedRelation) => - relation.paths.toSet - }.getOrElse { - fail("Expect an FSBasedRelation, but none could be found") - } - - assert(actualPaths === expectedPaths) - checkAnswer(df, partitionedTestDF.collect()) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/90f304b0/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala new file mode 100644 index 0000000..394833f --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.parquet.ParquetTest +import org.apache.spark.sql.types._ + +// TODO Don't extend ParquetTest +// This test suite extends ParquetTest for some convenient utility methods. These methods should be +// moved to some more general places, maybe QueryTest. +class FSBasedRelationTest extends QueryTest with ParquetTest { + override val sqlContext: SQLContext = TestHive + + import sqlContext._ + import sqlContext.implicits._ + + val dataSourceName = classOf[SimpleTextSource].getCanonicalName + + val dataSchema = + StructType( + Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = false))) + + val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") + + val partitionedTestDF1 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF2 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2) + + def checkQueries(df: DataFrame): Unit = { + // Selects everything + checkAnswer( + df, + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + + // Simple filtering and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 === 2), + for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) + + // Simple projection and filtering + checkAnswer( + df.filter('a > 1).select('b, 'a + 1), + for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) + + // Simple projection and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 < 2).select('b, 'p1), + for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) + + // Self-join + df.registerTempTable("t") + withTempTable("t") { + checkAnswer( + sql( + """SELECT l.a, r.b, l.p1, r.p2 + |FROM t l JOIN t r + |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 + """.stripMargin), + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + } + } + + test("save()/load() - non-partitioned table - Overwrite") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Overwrite) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + testDF.collect()) + } + } + + test("save()/load() - non-partitioned table - Append") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Append) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)).orderBy("a"), + testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("save()/load() - non-partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("save()/load() - non-partitioned table - Ignore") { + withTempDir { file => + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + assert(fs.listStatus(path).isEmpty) + } + } + + test("save()/load() - partitioned table - simple queries") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkQueries( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json))) + } + } + + test("save()/load() - partitioned table - Overwrite") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - Append") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("save()/load() - partitioned table - Append - new partition values") { + withTempPath { file => + partitionedTestDF1.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.save( + source = dataSourceName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("save()/load() - partitioned table - Ignore") { + withTempDir { file => + partitionedTestDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(SparkHadoopUtil.get.conf) + assert(fs.listStatus(path).isEmpty) + } + } + + def withTable(tableName: String)(f: => Unit): Unit = { + try f finally sql(s"DROP TABLE $tableName") + } + + test("saveAsTable()/load() - non-partitioned table - Overwrite") { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkAnswer(table("t"), testDF.collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - Append") { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite) + + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append) + + withTable("t") { + checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("saveAsTable()/load() - non-partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Ignore) + + assert(table("t").collect().isEmpty) + } + } + + test("saveAsTable()/load() - partitioned table - simple queries") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkQueries(table("t")) + } + } + + test("saveAsTable()/load() - partitioned table - Overwrite") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - new partition values") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + // Using only a subset of all partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1")) + } + + // Using different order of partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p2", "p1")) + } + } + + test("saveAsTable()/load() - partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.ErrorIfExists, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("saveAsTable()/load() - partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Ignore, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + assert(table("t").collect().isEmpty) + } + } + + test("Hadoop style globbing") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + val df = load( + source = dataSourceName, + options = Map( + "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", + "dataSchema" -> dataSchema.json)) + + val expectedPaths = Set( + s"${file.getCanonicalFile}/p1=1/p2=foo", + s"${file.getCanonicalFile}/p1=2/p2=foo", + s"${file.getCanonicalFile}/p1=1/p2=bar", + s"${file.getCanonicalFile}/p1=2/p2=bar" + ).map { p => + val path = new Path(p) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString + } + + val actualPaths = df.queryExecution.analyzed.collectFirst { + case LogicalRelation(relation: FSBasedRelation) => + relation.paths.toSet + }.getOrElse { + fail("Expect an FSBasedRelation, but none could be found") + } + + assert(actualPaths === expectedPaths) + checkAnswer(df, partitionedTestDF.collect()) + } + } +} + +class SimpleTextRelationSuite extends FSBasedRelationTest { + override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName + + import sqlContext._ + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") + .saveAsTextFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json))) + } + } +} + +class FSBasedParquetRelationSuite extends FSBasedRelationTest { + override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName + + import sqlContext._ + import sqlContext.implicits._ + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) + .toDF("a", "b", "p1") + .saveAsParquetFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json))) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
