Repository: spark
Updated Branches:
  refs/heads/master bf42c2db5 -> 476b34c23


[SPARK-18752][HIVE] isSrcLocal" value should be set from user query.

The value of the "isSrcLocal" parameter passed to Hive's loadTable and
loadPartition methods needs to be set according to the user query (e.g.
"LOAD DATA LOCAL"), and not the current code that tries to guess what
it should be.

For existing versions of Hive the current behavior is probably ok, but
some recent changes in the Hive code changed the semantics slightly,
making code that sets "isSrcLocal" to "true" incorrectly to do the
wrong thing. It would end up moving the parent directory of the files
into the final location, instead of the file themselves, resulting
in a table that cannot be read.

I modified HiveCommandSuite so that existing "LOAD DATA" tests are run
both in local and non-local mode, since the semantics are slightly different.
The tests include a few new checks to make sure the semantics follow
what Hive describes in its documentation.

Tested with existing unit tests and also ran some Hive integration tests
with a version of Hive containing the changes that surfaced the problem.

Author: Marcelo Vanzin <[email protected]>

Closes #16179 from vanzin/SPARK-18752.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/476b34c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/476b34c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/476b34c2

Branch: refs/heads/master
Commit: 476b34c23a1ece1d52654482a393003756957ad2
Parents: bf42c2d
Author: Marcelo Vanzin <[email protected]>
Authored: Mon Dec 12 14:19:42 2016 -0800
Committer: gatorsmile <[email protected]>
Committed: Mon Dec 12 14:19:42 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/ExternalCatalog.scala  |   6 +-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |   6 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   |  10 +-
 .../spark/sql/execution/command/tables.scala    |   8 +-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  12 +-
 .../spark/sql/hive/client/HiveClient.scala      |   6 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  12 +-
 .../apache/spark/sql/hive/client/HiveShim.scala |  28 +--
 .../hive/execution/InsertIntoHiveTable.scala    |   6 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |   6 +-
 .../sql/hive/execution/HiveCommandSuite.scala   | 172 +++++++++++--------
 11 files changed, 157 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 78897da..0c72964 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -119,7 +119,8 @@ abstract class ExternalCatalog {
       table: String,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadPartition(
       db: String,
@@ -128,7 +129,8 @@ abstract class ExternalCatalog {
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadDynamicPartitions(
       db: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a6bebe1..816e4af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -312,7 +312,8 @@ class InMemoryCatalog(
       table: String,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     throw new UnsupportedOperationException("loadTable is not implemented")
   }
 
@@ -323,7 +324,8 @@ class InMemoryCatalog(
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = {
     throw new UnsupportedOperationException("loadPartition is not 
implemented.")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 7a3d209..e996a83 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -311,12 +311,13 @@ class SessionCatalog(
       name: TableIdentifier,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
-    externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
+    externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, 
isSrcLocal)
   }
 
   /**
@@ -330,13 +331,14 @@ class SessionCatalog(
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = {
     val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(name.table)
     requireDbExists(db)
     requireTableExists(TableIdentifier(table, Some(db)))
     externalCatalog.loadPartition(
-      db, table, loadPath, partition, isOverwrite, holdDDLTime, 
inheritTableSpecs)
+      db, table, loadPath, partition, isOverwrite, holdDDLTime, 
inheritTableSpecs, isSrcLocal)
   }
 
   def defaultTablePath(tableIdent: TableIdentifier): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 32e2f75..d2a7556 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -203,7 +203,7 @@ case class LoadDataCommand(
         throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB 
is partitioned, " +
           s"but number of columns in provided partition spec 
(${partition.get.size}) " +
           s"do not match number of partitioned columns in table " +
-          s"(s${targetTable.partitionColumnNames.size})")
+          s"(${targetTable.partitionColumnNames.size})")
       }
       partition.get.keys.foreach { colName =>
         if (!targetTable.partitionColumnNames.contains(colName)) {
@@ -297,13 +297,15 @@ case class LoadDataCommand(
         partition.get,
         isOverwrite,
         holdDDLTime = false,
-        inheritTableSpecs = true)
+        inheritTableSpecs = true,
+        isSrcLocal = isLocal)
     } else {
       catalog.loadTable(
         targetTable.identifier,
         loadPath.toString,
         isOverwrite,
-        holdDDLTime = false)
+        holdDDLTime = false,
+        isSrcLocal = isLocal)
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f67ddc9..544f277 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -736,13 +736,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       table: String,
       loadPath: String,
       isOverwrite: Boolean,
-      holdDDLTime: Boolean): Unit = withClient {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = withClient {
     requireTableExists(db, table)
     client.loadTable(
       loadPath,
       s"$db.$table",
       isOverwrite,
-      holdDDLTime)
+      holdDDLTime,
+      isSrcLocal)
   }
 
   override def loadPartition(
@@ -752,7 +754,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       partition: TablePartitionSpec,
       isOverwrite: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = withClient {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = withClient {
     requireTableExists(db, table)
 
     val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
@@ -771,7 +774,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       orderedPartitionSpec,
       isOverwrite,
       holdDDLTime,
-      inheritTableSpecs)
+      inheritTableSpecs,
+      isSrcLocal)
   }
 
   override def loadDynamicPartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 8e7c871..837b6c5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -211,14 +211,16 @@ private[hive] trait HiveClient {
       partSpec: java.util.LinkedHashMap[String, String], // Hive relies on 
LinkedHashMap ordering
       replace: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit
 
   /** Loads data into an existing table. */
   def loadTable(
       loadPath: String, // TODO URI
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit
 
   /** Loads new dynamic partitions into an existing table. */
   def loadDynamicPartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index db73596..b75f6e9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -651,7 +651,8 @@ private[hive] class HiveClientImpl(
       partSpec: java.util.LinkedHashMap[String, String],
       replace: Boolean,
       holdDDLTime: Boolean,
-      inheritTableSpecs: Boolean): Unit = withHiveState {
+      inheritTableSpecs: Boolean,
+      isSrcLocal: Boolean): Unit = withHiveState {
     val hiveTable = client.getTable(dbName, tableName, true /* throw exception 
*/)
     shim.loadPartition(
       client,
@@ -661,20 +662,23 @@ private[hive] class HiveClientImpl(
       replace,
       holdDDLTime,
       inheritTableSpecs,
-      isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
+      isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
+      isSrcLocal = isSrcLocal)
   }
 
   def loadTable(
       loadPath: String, // TODO URI
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit = withHiveState {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = withHiveState {
     shim.loadTable(
       client,
       new Path(loadPath),
       tableName,
       replace,
-      holdDDLTime)
+      holdDDLTime,
+      isSrcLocal)
   }
 
   def loadDynamicPartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index e561706..137ec26 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -98,14 +98,16 @@ private[client] sealed abstract class Shim {
       replace: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit
+      isSkewedStoreAsSubdir: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadTable(
       hive: Hive,
       loadPath: Path,
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit
 
   def loadDynamicPartitions(
       hive: Hive,
@@ -332,7 +334,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
       replace: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
+      isSkewedStoreAsSubdir: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: 
JBoolean,
       holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, 
isSkewedStoreAsSubdir: JBoolean)
   }
@@ -342,7 +345,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
       loadPath: Path,
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
holdDDLTime: JBoolean)
   }
 
@@ -698,10 +702,11 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       replace: Boolean,
       holdDDLTime: Boolean,
       inheritTableSpecs: Boolean,
-      isSkewedStoreAsSubdir: Boolean): Unit = {
+      isSkewedStoreAsSubdir: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: 
JBoolean,
       holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, 
isSkewedStoreAsSubdir: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
+      isSrcLocal: JBoolean, JBoolean.FALSE)
   }
 
   override def loadTable(
@@ -709,9 +714,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       loadPath: Path,
       tableName: String,
       replace: Boolean,
-      holdDDLTime: Boolean): Unit = {
+      holdDDLTime: Boolean,
+      isSrcLocal: Boolean): Unit = {
     loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, 
holdDDLTime: JBoolean,
-      isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, 
JBoolean.FALSE)
+      isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
   }
 
   override def loadDynamicPartitions(
@@ -749,12 +755,6 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       TimeUnit.MILLISECONDS).asInstanceOf[Long]
   }
 
-  protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
-    val localFs = FileSystem.getLocal(conf)
-    val pathFs = FileSystem.get(path.toUri(), conf)
-    localFs.getUri() == pathFs.getUri()
-  }
-
 }
 
 private[client] class Shim_v1_0 extends Shim_v0_14 {

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 5f5c8e2..db2239d 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -316,7 +316,8 @@ case class InsertIntoHiveTable(
             partitionSpec,
             isOverwrite = doHiveOverwrite,
             holdDDLTime = holdDDLTime,
-            inheritTableSpecs = inheritTableSpecs)
+            inheritTableSpecs = inheritTableSpecs,
+            isSrcLocal = false)
         }
       }
     } else {
@@ -325,7 +326,8 @@ case class InsertIntoHiveTable(
         table.catalogTable.identifier.table,
         outputPath.toString, // TODO: URI
         overwrite,
-        holdDDLTime)
+        holdDDLTime,
+        isSrcLocal = false)
     }
 
     // Invalidate the cache.

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 79e76b3..a001048 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
         emptyDir,
         tableName = "src",
         replace = false,
-        holdDDLTime = false)
+        holdDDLTime = false,
+        isSrcLocal = false)
     }
 
     test(s"$version: tableExists") {
@@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
         partSpec,
         replace = false,
         holdDDLTime = false,
-        inheritTableSpecs = false)
+        inheritTableSpecs = false,
+        isSrcLocal = false)
     }
 
     test(s"$version: loadDynamicPartitions") {

http://git-wip-us.apache.org/repos/asf/spark/blob/476b34c2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 46ed18c..1680f6c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.hive.execution
 
+import java.io.File
+
+import com.google.common.io.Files
+
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -154,7 +158,39 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
     }
   }
 
-  test("LOAD DATA") {
+  Seq(true, false).foreach { local =>
+    val loadQuery = if (local) "LOAD DATA LOCAL" else "LOAD DATA"
+    test(loadQuery) {
+      testLoadData(loadQuery, local)
+    }
+  }
+
+  private def testLoadData(loadQuery: String, local: Boolean): Unit = {
+    // employee.dat has two columns separated by '|', the first is an int, the 
second is a string.
+    // Its content looks like:
+    // 16|john
+    // 17|robert
+    val testData = 
hiveContext.getHiveFile("data/files/employee.dat").getCanonicalFile()
+
+    /**
+     * Run a function with a copy of the input data file when running with 
non-local input. The
+     * semantics in this mode are that the input file is moved to the 
destination, so we have
+     * to make a copy so that subsequent tests have access to the original 
file.
+     */
+    def withInputFile(fn: File => Unit): Unit = {
+      if (local) {
+        fn(testData)
+      } else {
+        val tmp = File.createTempFile(testData.getName(), ".tmp")
+        Files.copy(testData, tmp)
+        try {
+          fn(tmp)
+        } finally {
+          tmp.delete()
+        }
+      }
+    }
+
     withTable("non_part_table", "part_table") {
       sql(
         """
@@ -164,18 +200,49 @@ class HiveCommandSuite extends QueryTest with 
SQLTestUtils with TestHiveSingleto
           |LINES TERMINATED BY '\n'
         """.stripMargin)
 
-      // employee.dat has two columns separated by '|', the first is an int, 
the second is a string.
-      // Its content looks like:
-      // 16|john
-      // 17|robert
-      val testData = 
hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
       // LOAD DATA INTO non-partitioned table can't specify partition
       intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table 
PARTITION(ds="1")""")
+        sql(s"""$loadQuery INPATH "$testData" INTO TABLE non_part_table 
PARTITION(ds="1")""")
+      }
+
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "$path" INTO TABLE non_part_table""")
+
+        // Non-local mode is expected to move the file, while local mode is 
expected to copy it.
+        // Check once here that the behavior is the expected.
+        assert(local === path.exists())
+      }
+
+      checkAnswer(
+        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+        Row(16, "john") :: Nil)
+
+      // Incorrect URI.
+      // file://path/to/data/files/employee.dat
+      //
+      // TODO: need a similar test for non-local mode.
+      if (local) {
+        val incorrectUri = "file:/" + testData.getAbsolutePath()
+        intercept[AnalysisException] {
+          sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE 
non_part_table""")
+        }
+      }
+
+      // Use URI as inpath:
+      // file:/path/to/data/files/employee.dat
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "${path.toURI()}" INTO TABLE 
non_part_table""")
+      }
+
+      checkAnswer(
+        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
+        Row(16, "john") :: Row(16, "john") :: Nil)
+
+      // Overwrite existing data.
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "${path.toURI()}" OVERWRITE INTO TABLE 
non_part_table""")
       }
 
-      sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""")
       checkAnswer(
         sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
         Row(16, "john") :: Nil)
@@ -190,87 +257,39 @@ class HiveCommandSuite extends QueryTest with 
SQLTestUtils with TestHiveSingleto
         """.stripMargin)
 
       // LOAD DATA INTO partitioned table must specify partition
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table""")
+      withInputFile { path =>
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table""")
+        }
+
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table 
PARTITION(c="1")""")
+        }
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table 
PARTITION(d="1")""")
+        }
+        intercept[AnalysisException] {
+          sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table 
PARTITION(c="1", k="2")""")
+        }
       }
 
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table 
PARTITION(c="1")""")
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table 
PARTITION(c="1", d="2")""")
       }
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table 
PARTITION(d="1")""")
-      }
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table 
PARTITION(c="1", k="2")""")
-      }
-
-      sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table 
PARTITION(c="1", d="2")""")
       checkAnswer(
         sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND 
d = '2'"),
         sql("SELECT * FROM non_part_table").collect())
 
       // Different order of partition columns.
-      sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table 
PARTITION(d="1", c="2")""")
+      withInputFile { path =>
+        sql(s"""$loadQuery INPATH "$path" INTO TABLE part_table 
PARTITION(d="1", c="2")""")
+      }
       checkAnswer(
         sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND 
d = '1'"),
         sql("SELECT * FROM non_part_table").collect())
     }
   }
 
-  test("LOAD DATA: input path") {
-    withTable("non_part_table") {
-      sql(
-        """
-          |CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
-          |ROW FORMAT DELIMITED
-          |FIELDS TERMINATED BY '|'
-          |LINES TERMINATED BY '\n'
-        """.stripMargin)
-
-      // Non-existing inpath
-      intercept[AnalysisException] {
-        sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE 
non_part_table""")
-      }
-
-      val testData = 
hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
-
-      // Non-local inpath: without URI Scheme and Authority
-      sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Nil)
-
-      // Use URI as LOCAL inpath:
-      // file:/path/to/data/files/employee.dat
-      val uri = "file:" + testData
-      sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""")
-
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Row(16, "john") :: Nil)
-
-      // Use URI as non-LOCAL inpath
-      sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""")
-
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil)
-
-      sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""")
-
-      checkAnswer(
-        sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
-        Row(16, "john") :: Nil)
-
-      // Incorrect URI:
-      // file://path/to/data/files/employee.dat
-      val incorrectUri = "file:/" + testData
-      intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$incorrectUri" INTO TABLE 
non_part_table""")
-      }
-    }
-  }
-
   test("Truncate Table") {
     withTable("non_part_table", "part_table") {
       sql(
@@ -418,4 +437,5 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
       assert(sql("SHOW PARTITIONS part_datasrc").count() == 3)
     }
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to