Repository: spark
Updated Branches:
  refs/heads/master e4d8f9a36 -> f76790557


http://git-wip-us.apache.org/repos/asf/spark/blob/f7679055/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
new file mode 100644
index 0000000..aa5cae3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, _}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+case class TestData(key: Int, value: String)
+
+case class ThreeCloumntable(key: Int, value: String, key1: String)
+
+class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
+    with SQLTestUtils {
+  import spark.implicits._
+
+  override lazy val testData = spark.sparkContext.parallelize(
+    (1 to 100).map(i => TestData(i, i.toString))).toDF()
+
+  before {
+    // Since every we are doing tests for DDL statements,
+    // it is better to reset before every test.
+    hiveContext.reset()
+    // Creates a temporary view with testData, which will be used in all tests.
+    testData.createOrReplaceTempView("testData")
+  }
+
+  test("insertInto() HiveTable") {
+    withTable("createAndInsertTest") {
+      sql("CREATE TABLE createAndInsertTest (key int, value string)")
+
+      // Add some data.
+      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+      // Make sure the table has also been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.collect().toSeq
+      )
+
+      // Add more data.
+      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+      // Make sure the table has been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
+      )
+
+      // Now overwrite.
+      testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
+
+      // Make sure the registered table has also been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.collect().toSeq
+      )
+    }
+  }
+
+  test("Double create fails when allowExisting = false") {
+    withTable("doubleCreateAndInsertTest") {
+      sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+
+      intercept[AnalysisException] {
+        sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+      }
+    }
+  }
+
+  test("Double create does not fail when allowExisting = true") {
+    withTable("doubleCreateAndInsertTest") {
+      sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+      sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, 
value string)")
+    }
+  }
+
+  test("SPARK-4052: scala.collection.Map as value type of MapType") {
+    val schema = StructType(StructField("m", MapType(StringType, StringType), 
true) :: Nil)
+    val rowRDD = spark.sparkContext.parallelize(
+      (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> 
s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithMapValue")
+    sql("CREATE TABLE hiveTableWithMapValue(m MAP <STRING, STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM 
tableWithMapValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithMapValue"),
+      rowRDD.collect().toSeq
+    )
+
+    sql("DROP TABLE hiveTableWithMapValue")
+  }
+
+  test("SPARK-4203:random partition directory order") {
+    sql("CREATE TABLE tmp_table (key int, value string)")
+    val tmpDir = Utils.createTempDir()
+    // The default value of hive.exec.stagingdir.
+    val stagingDir = ".hive-staging"
+
+    sql(
+      s"""
+         |CREATE TABLE table_with_partition(c1 string)
+         |PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string)
+         |location '${tmpDir.toURI.toString}'
+        """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='1')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='2')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='3')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    sql(
+      """
+        |INSERT OVERWRITE TABLE table_with_partition
+        |partition (p1='a',p2='b',p3='c',p4='c',p5='4')
+        |SELECT 'blarr' FROM tmp_table
+      """.stripMargin)
+    def listFolders(path: File, acc: List[String]): List[List[String]] = {
+      val dir = path.listFiles()
+      val folders = dir.filter { e => e.isDirectory && 
!e.getName().startsWith(stagingDir) }.toList
+      if (folders.isEmpty) {
+        List(acc.reverse)
+      } else {
+        folders.flatMap(x => listFolders(x, x.getName :: acc))
+      }
+    }
+    val expected = List(
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
+    )
+    assert(listFolders(tmpDir, List()).sortBy(_.toString()) === 
expected.sortBy(_.toString))
+    sql("DROP TABLE table_with_partition")
+    sql("DROP TABLE tmp_table")
+  }
+
+  testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { 
tableName =>
+    val selQuery = s"select a, b, c, d from $tableName"
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3)
+         |SELECT 1, 4
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(1, 2, 3, 4))
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3)
+         |SELECT 5, 6
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+
+    val e = intercept[AnalysisException] {
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $tableName
+           |partition (b=2, c) IF NOT EXISTS
+           |SELECT 7, 8, 3
+          """.stripMargin)
+    }
+    assert(e.getMessage.contains(
+      "Dynamic partitions do not support IF NOT EXISTS. Specified partitions 
with value: [c]"))
+
+    // If the partition already exists, the insert will overwrite the data
+    // unless users specify IF NOT EXISTS
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=2, c=3) IF NOT EXISTS
+         |SELECT 9, 10
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+
+    // ADD PARTITION has the same effect, even if no actual data is inserted.
+    sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tableName
+         |partition (b=21, c=31) IF NOT EXISTS
+         |SELECT 20, 24
+        """.stripMargin)
+    checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
+  }
+
+  test("Insert ArrayType.containsNull == false") {
+    val schema = StructType(Seq(
+      StructField("a", ArrayType(StringType, containsNull = false))))
+    val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => 
Row(Seq(s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithArrayValue")
+    sql("CREATE TABLE hiveTableWithArrayValue(a Array <STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM 
tableWithArrayValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithArrayValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithArrayValue")
+  }
+
+  test("Insert MapType.valueContainsNull == false") {
+    val schema = StructType(Seq(
+      StructField("m", MapType(StringType, StringType, valueContainsNull = 
false))))
+    val rowRDD = spark.sparkContext.parallelize(
+      (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithMapValue")
+    sql("CREATE TABLE hiveTableWithMapValue(m Map <STRING, STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM 
tableWithMapValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithMapValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithMapValue")
+  }
+
+  test("Insert StructType.fields.exists(_.nullable == false)") {
+    val schema = StructType(Seq(
+      StructField("s", StructType(Seq(StructField("f", StringType, nullable = 
false))))))
+    val rowRDD = spark.sparkContext.parallelize(
+      (1 to 100).map(i => Row(Row(s"value$i"))))
+    val df = spark.createDataFrame(rowRDD, schema)
+    df.createOrReplaceTempView("tableWithStructValue")
+    sql("CREATE TABLE hiveTableWithStructValue(s Struct <f: STRING>)")
+    sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM 
tableWithStructValue")
+
+    checkAnswer(
+      sql("SELECT * FROM hiveTableWithStructValue"),
+      rowRDD.collect().toSeq)
+
+    sql("DROP TABLE hiveTableWithStructValue")
+  }
+
+  test("Test partition mode = strict") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
+      withTable("partitioned") {
+        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY 
(part string)")
+        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" 
else "odd"))
+          .toDF("id", "data", "part")
+
+        intercept[SparkException] {
+          data.write.insertInto("partitioned")
+        }
+      }
+    }
+  }
+
+  test("Detect table partitioning") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+      withTable("source", "partitioned") {
+        sql("CREATE TABLE source (id bigint, data string, part string)")
+        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" 
else "odd")).toDF()
+
+        data.write.insertInto("source")
+        checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
+
+        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY 
(part string)")
+        // this will pick up the output partitioning from the table definition
+        spark.table("source").write.insertInto("partitioned")
+
+        checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
+      }
+    }
+  }
+
+  private def testPartitionedHiveSerDeTable(testName: String)(f: String => 
Unit): Unit = {
+    test(s"Hive SerDe table - $testName") {
+      val hiveTable = "hive_table"
+
+      withTable(hiveTable) {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          sql(
+            s"""
+              |CREATE TABLE $hiveTable (a INT, d INT)
+              |PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE
+            """.stripMargin)
+          f(hiveTable)
+        }
+      }
+    }
+  }
+
+  private def testPartitionedDataSourceTable(testName: String)(f: String => 
Unit): Unit = {
+    test(s"Data source table - $testName") {
+      val dsTable = "ds_table"
+
+      withTable(dsTable) {
+        sql(
+          s"""
+             |CREATE TABLE $dsTable (a INT, b INT, c INT, d INT)
+             |USING PARQUET PARTITIONED BY (b, c)
+           """.stripMargin)
+        f(dsTable)
+      }
+    }
+  }
+
+  private def testPartitionedTable(testName: String)(f: String => Unit): Unit 
= {
+    testPartitionedHiveSerDeTable(testName)(f)
+    testPartitionedDataSourceTable(testName)(f)
+  }
+
+  testPartitionedTable("partitionBy() can't be used together with 
insertInto()") { tableName =>
+    val cause = intercept[AnalysisException] {
+      Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", 
"c").insertInto(tableName)
+    }
+
+    assert(cause.getMessage.contains("insertInto() can't be used together with 
partitionBy()."))
+  }
+
+  testPartitionedTable(
+    "SPARK-16036: better error message when insert into a table with mismatch 
schema") {
+    tableName =>
+      val e = intercept[AnalysisException] {
+        sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
+      }
+      assert(e.message.contains(
+        "target table has 4 column(s) but the inserted data has 5 column(s)"))
+  }
+
+  testPartitionedTable("SPARK-16037: INSERT statement should match columns by 
position") {
+    tableName =>
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+        checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+        sql(s"INSERT OVERWRITE TABLE $tableName SELECT 1, 4, 2, 3")
+        checkAnswer(sql(s"SELECT a, b, c, 4 FROM $tableName"), Row(1, 2, 3, 4))
+      }
+  }
+
+  testPartitionedTable("INSERT INTO a partitioned table (semantic and error 
handling)") {
+    tableName =>
+      withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b=2, c=3) SELECT 1, 4")
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b=6, c=7) SELECT 5, 8")
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 
12")
+
+        // c is defined twice. Analyzer will complain.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, c=16) 
SELECT 13")
+        }
+
+        // d is not a partitioning column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) 
SELECT 13, 14")
+        }
+
+        // d is not a partitioning column. The total number of columns is 
correct.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c=15, d=16) 
SELECT 13")
+        }
+
+        // The data is missing a column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
+        }
+
+        // d is not a partitioning column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15, d=15) SELECT 13, 
14")
+        }
+
+        // The statement is missing a column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14")
+        }
+
+        // The statement is missing a column.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b=15) SELECT 13, 14, 
16")
+        }
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b=14, c) SELECT 13, 16, 
15")
+
+        // Dynamic partitioning columns need to be after static partitioning 
columns.
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION (b, c=19) SELECT 17, 
20, 18")
+        }
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (b, c) SELECT 17, 20, 18, 
19")
+
+        sql(s"INSERT INTO TABLE $tableName PARTITION (c, b) SELECT 21, 24, 22, 
23")
+
+        sql(s"INSERT INTO TABLE $tableName SELECT 25, 28, 26, 27")
+
+        checkAnswer(
+          sql(s"SELECT a, b, c, d FROM $tableName"),
+          Row(1, 2, 3, 4) ::
+            Row(5, 6, 7, 8) ::
+            Row(9, 10, 11, 12) ::
+            Row(13, 14, 15, 16) ::
+            Row(17, 18, 19, 20) ::
+            Row(21, 22, 23, 24) ::
+            Row(25, 26, 27, 28) :: Nil
+        )
+      }
+  }
+
+  testPartitionedTable("insertInto() should match columns by position and 
ignore column names") {
+    tableName =>
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        // Columns `df.c` and `df.d` are resolved by position, and thus mapped 
to partition columns
+        // `b` and `c` of the target table.
+        val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
+        df.write.insertInto(tableName)
+
+        checkAnswer(
+          sql(s"SELECT a, b, c, d FROM $tableName"),
+          Row(1, 3, 4, 2)
+        )
+      }
+  }
+
+  testPartitionedTable("insertInto() should match unnamed columns by 
position") {
+    tableName =>
+      withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        // Columns `c + 1` and `d + 1` are resolved by position, and thus 
mapped to partition
+        // columns `b` and `c` of the target table.
+        val df = Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d")
+        df.select('a + 1, 'b + 1, 'c + 1, 'd + 1).write.insertInto(tableName)
+
+        checkAnswer(
+          sql(s"SELECT a, b, c, d FROM $tableName"),
+          Row(2, 4, 5, 3)
+        )
+      }
+  }
+
+  testPartitionedTable("insertInto() should reject missing columns") {
+    tableName =>
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT)")
+
+        intercept[AnalysisException] {
+          spark.table("t").write.insertInto(tableName)
+        }
+      }
+  }
+
+  testPartitionedTable("insertInto() should reject extra columns") {
+    tableName =>
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
+
+        intercept[AnalysisException] {
+          spark.table("t").write.insertInto(tableName)
+        }
+      }
+  }
+
+  private def testBucketedTable(testName: String)(f: String => Unit): Unit = {
+    test(s"Hive SerDe table - $testName") {
+      val hiveTable = "hive_table"
+
+      withTable(hiveTable) {
+        withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+          sql(
+            s"""
+               |CREATE TABLE $hiveTable (a INT, d INT)
+               |PARTITIONED BY (b INT, c INT)
+               |CLUSTERED BY(a)
+               |SORTED BY(a, d) INTO 256 BUCKETS
+               |STORED AS TEXTFILE
+            """.stripMargin)
+          f(hiveTable)
+        }
+      }
+    }
+  }
+
+  testBucketedTable("INSERT should NOT fail if strict bucketing is NOT 
enforced") {
+    tableName =>
+      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" 
-> "false") {
+        sql(s"INSERT INTO TABLE $tableName SELECT 1, 4, 2 AS c, 3 AS b")
+        checkAnswer(sql(s"SELECT a, b, c, d FROM $tableName"), Row(1, 2, 3, 4))
+      }
+  }
+
+  testBucketedTable("INSERT should fail if strict bucketing / sorting is 
enforced") {
+    tableName =>
+      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" 
-> "false") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+      withSQLConf("hive.enforce.bucketing" -> "false", "hive.enforce.sorting" 
-> "true") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+      withSQLConf("hive.enforce.bucketing" -> "true", "hive.enforce.sorting" 
-> "true") {
+        intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName SELECT 1, 2, 3, 4")
+        }
+      }
+  }
+
+  test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") {
+    // Set hive.exec.stagingdir under the table directory without start with 
".".
+    withSQLConf("hive.exec.stagingdir" -> "./test") {
+      withTable("test_table") {
+        sql("CREATE TABLE test_table (key int)")
+        sql("INSERT OVERWRITE TABLE test_table SELECT 1")
+        checkAnswer(sql("SELECT * FROM test_table"), Row(1))
+      }
+    }
+  }
+
+  test("insert overwrite to dir from hive metastore table") {
+    withTempDir { dir =>
+      val path = dir.toURI.getPath
+
+      sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path}' SELECT * FROM src where 
key < 10")
+
+      sql(
+        s"""
+           |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+           |STORED AS orc
+           |SELECT * FROM src where key < 10
+         """.stripMargin)
+
+      // use orc data source to check the data of path is right.
+      withTempView("orc_source") {
+        sql(
+          s"""
+             |CREATE TEMPORARY VIEW orc_source
+             |USING org.apache.spark.sql.hive.orc
+             |OPTIONS (
+             |  PATH '${dir.getCanonicalPath}'
+             |)
+           """.stripMargin)
+
+        checkAnswer(
+          sql("select * from orc_source"),
+          sql("select * from src where key < 10"))
+      }
+    }
+  }
+
+  test("insert overwrite to local dir from temp table") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS 
str").createOrReplaceTempView("test_insert_table")
+
+      withTempDir { dir =>
+        val path = dir.toURI.getPath
+
+        sql(
+          s"""
+             |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        sql(
+          s"""
+             |INSERT OVERWRITE LOCAL DIRECTORY '${path}'
+             |STORED AS orc
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        // use orc data source to check the data of path is right.
+        checkAnswer(
+          spark.read.orc(dir.getCanonicalPath),
+          sql("select * from test_insert_table"))
+      }
+    }
+  }
+
+  test("insert overwrite to dir from temp table") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS 
str").createOrReplaceTempView("test_insert_table")
+
+      withTempDir { dir =>
+        val pathUri = dir.toURI
+
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY '${pathUri}'
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY '${pathUri}'
+             |STORED AS orc
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+
+        // use orc data source to check the data of path is right.
+        checkAnswer(
+          spark.read.orc(dir.getCanonicalPath),
+          sql("select * from test_insert_table"))
+      }
+    }
+  }
+
+  test("multi insert overwrite to dir") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS 
str").createOrReplaceTempView("test_insert_table")
+
+      withTempDir { dir =>
+        val pathUri = dir.toURI
+
+        withTempDir { dir2 =>
+          val pathUri2 = dir2.toURI
+
+          sql(
+            s"""
+               |FROM test_insert_table
+               |INSERT OVERWRITE DIRECTORY '${pathUri}'
+               |STORED AS orc
+               |SELECT id
+               |INSERT OVERWRITE DIRECTORY '${pathUri2}'
+               |STORED AS orc
+               |SELECT *
+             """.stripMargin)
+
+          // use orc data source to check the data of path is right.
+          checkAnswer(
+            spark.read.orc(dir.getCanonicalPath),
+            sql("select id from test_insert_table"))
+
+          checkAnswer(
+            spark.read.orc(dir2.getCanonicalPath),
+            sql("select * from test_insert_table"))
+        }
+      }
+    }
+  }
+
+  test("insert overwrite to dir to illegal path") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS 
str").createOrReplaceTempView("test_insert_table")
+
+      val e = intercept[IllegalArgumentException] {
+        sql(
+          s"""
+             |INSERT OVERWRITE LOCAL DIRECTORY 'abc://a'
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+      }.getMessage
+
+      assert(e.contains("Wrong FS: abc://a, expected: file:///"))
+    }
+  }
+
+  test("insert overwrite to dir with mixed syntax") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS 
str").createOrReplaceTempView("test_insert_table")
+
+      val e = intercept[ParseException] {
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY 'file://tmp'
+             |USING json
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+      }.getMessage
+
+      assert(e.contains("mismatched input 'ROW'"))
+    }
+  }
+
+  test("insert overwrite to dir with multi inserts") {
+    withTempView("test_insert_table") {
+      spark.range(10).selectExpr("id", "id AS 
str").createOrReplaceTempView("test_insert_table")
+
+      val e = intercept[ParseException] {
+        sql(
+          s"""
+             |INSERT OVERWRITE DIRECTORY 'file://tmp2'
+             |USING json
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+             |INSERT OVERWRITE DIRECTORY 'file://tmp2'
+             |USING json
+             |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+             |SELECT * FROM test_insert_table
+           """.stripMargin)
+      }.getMessage
+
+      assert(e.contains("mismatched input 'ROW'"))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to