Repository: spark Updated Branches: refs/heads/branch-2.0 ca271c792 -> 3def56120
[SPARK-15431][SQL] Support LIST FILE(s)|JAR(s) command natively ## What changes were proposed in this pull request? Currently command `ADD FILE|JAR <filepath | jarpath>` is supported natively in SparkSQL. However, when this command is run, the file/jar is added to the resources that can not be looked up by `LIST FILE(s)|JAR(s)` command because the `LIST` command is passed to Hive command processor in Spark-SQL or simply not supported in Spark-shell. There is no way users can find out what files/jars are added to the spark context. Refer to [Hive commands](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli) This PR is to support following commands: `LIST (FILE[s] [filepath ...] | JAR[s] [jarfile ...])` ### For example: ##### LIST FILE(s) ``` scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt") res1: org.apache.spark.sql.DataFrame = [] scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt") res2: org.apache.spark.sql.DataFrame = [] scala> spark.sql("list file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt").show(false) +----------------------------------------------+ |result | +----------------------------------------------+ |hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt| +----------------------------------------------+ scala> spark.sql("list files").show(false) +----------------------------------------------+ |result | +----------------------------------------------+ |hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt| |hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt | +----------------------------------------------+ ``` ##### LIST JAR(s) ``` scala> spark.sql("add jar /Users/xinwu/spark/core/src/test/resources/TestUDTF.jar") res9: org.apache.spark.sql.DataFrame = [result: int] scala> spark.sql("list jar TestUDTF.jar").show(false) +---------------------------------------------+ |result | +---------------------------------------------+ |spark://192.168.1.234:50131/jars/TestUDTF.jar| +---------------------------------------------+ scala> spark.sql("list jars").show(false) +---------------------------------------------+ |result | +---------------------------------------------+ |spark://192.168.1.234:50131/jars/TestUDTF.jar| +---------------------------------------------+ ``` ## How was this patch tested? New test cases are added for Spark-SQL, Spark-Shell and SparkContext API code path. Author: Xin Wu <xi...@us.ibm.com> Author: xin Wu <xi...@us.ibm.com> Closes #13212 from xwu0226/list_command. (cherry picked from commit 01659bc50cd3d53815d205d005c3678e714c08e0) Signed-off-by: Cheng Lian <l...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3def5612 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3def5612 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3def5612 Branch: refs/heads/branch-2.0 Commit: 3def56120e0a57238c95f64e7bb84f973b22c065 Parents: ca271c7 Author: Xin Wu <xi...@us.ibm.com> Authored: Mon May 23 17:32:01 2016 -0700 Committer: Cheng Lian <l...@databricks.com> Committed: Mon May 23 17:32:15 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 10 ++++ core/src/test/resources/TestUDTF.jar | Bin 0 -> 1328 bytes .../org/apache/spark/SparkContextSuite.scala | 14 ++++- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 5 +- .../spark/sql/execution/SparkSqlParser.scala | 39 +++++++++++-- .../spark/sql/execution/command/resources.scala | 57 ++++++++++++++++++- .../hive/thriftserver/SparkSQLCLIDriver.scala | 10 ++-- .../src/test/resources/TestUDTF.jar | Bin 0 -> 1328 bytes .../spark/sql/hive/thriftserver/CliSuite.scala | 19 +++++++ .../sql/hive/execution/HiveQuerySuite.scala | 12 ++++ 10 files changed, 149 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e6cdd0d..351024b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1387,6 +1387,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** + * Returns a list of file paths that are added to resources. + */ + def listFiles(): Seq[String] = addedFiles.keySet.toSeq + + /** * Add a file to be downloaded with this Spark job on every node. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, @@ -1724,6 +1729,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } + /** + * Returns a list of jar files that are added to resources. + */ + def listJars(): Seq[String] = addedJars.keySet.toSeq + // Shut down the SparkContext. def stop() { if (LiveListenerBus.withinListenerThread.value) { http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/core/src/test/resources/TestUDTF.jar ---------------------------------------------------------------------- diff --git a/core/src/test/resources/TestUDTF.jar b/core/src/test/resources/TestUDTF.jar new file mode 100644 index 0000000..514f2d5 Binary files /dev/null and b/core/src/test/resources/TestUDTF.jar differ http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 6398708..ae66513 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { assert(byteArray2.length === 0) } - test("addFile works") { + test("basic case for addFile and listFiles") { val dir = Utils.createTempDir() val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) @@ -156,6 +156,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { } x }).count() + assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) + } finally { + sc.stop() + } + } + + test("add and list jar files") { + val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar") + try { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addJar(jarPath.toString) + assert(sc.listJars().filter(_.contains("TestUDTF.jar")).size == 1) } finally { sc.stop() } http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 8ea8f76..403191a 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -117,7 +117,7 @@ statement tableIdentifier partitionSpec? #loadData | TRUNCATE TABLE tableIdentifier partitionSpec? (COLUMNS identifierList)? #truncateTable - | ADD identifier .*? #addResource + | op=(ADD | LIST) identifier .*? #manageResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration @@ -642,7 +642,7 @@ nonReserved | SORT | CLUSTER | DISTRIBUTE | UNSET | TBLPROPERTIES | SKEWED | STORED | DIRECTORIES | LOCATION | EXCHANGE | ARCHIVE | UNARCHIVE | FILEFORMAT | TOUCH | COMPACT | CONCATENATE | CHANGE | CASCADE | RESTRICT | BUCKETS | CLUSTERED | SORTED | PURGE | INPUTFORMAT | OUTPUTFORMAT - | DBPROPERTIES | DFS | TRUNCATE | COMPUTE + | DBPROPERTIES | DFS | TRUNCATE | COMPUTE | LIST | STATISTICS | ANALYZE | PARTITIONED | EXTERNAL | DEFINED | RECORDWRITER | REVOKE | GRANT | LOCK | UNLOCK | MSCK | REPAIR | EXPORT | IMPORT | LOAD | VALUES | COMMENT | ROLE | ROLES | COMPACTIONS | PRINCIPALS | TRANSACTIONS | INDEX | INDEXES | LOCKS | OPTION | LOCAL | INPATH @@ -843,6 +843,7 @@ DFS: 'DFS'; TRUNCATE: 'TRUNCATE'; ANALYZE: 'ANALYZE'; COMPUTE: 'COMPUTE'; +LIST: 'LIST'; STATISTICS: 'STATISTICS'; PARTITIONED: 'PARTITIONED'; EXTERNAL: 'EXTERNAL'; http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 6e4af95..f85d606 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -774,13 +774,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create an [[AddJarCommand]] or [[AddFileCommand]] command depending on the requested resource. + * Create a [[AddFileCommand]], [[AddJarCommand]], [[ListFilesCommand]] or [[ListJarsCommand]] + * command depending on the requested operation on resources. + * Expected format: + * {{{ + * ADD (FILE[S] <filepath ...> | JAR[S] <jarpath ...>) + * LIST (FILE[S] [filepath ...] | JAR[S] [jarpath ...]) + * }}} */ - override def visitAddResource(ctx: AddResourceContext): LogicalPlan = withOrigin(ctx) { - ctx.identifier.getText.toLowerCase match { - case "file" => AddFileCommand(remainder(ctx.identifier).trim) - case "jar" => AddJarCommand(remainder(ctx.identifier).trim) - case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx) + override def visitManageResource(ctx: ManageResourceContext): LogicalPlan = withOrigin(ctx) { + val mayebePaths = remainder(ctx.identifier).trim + ctx.op.getType match { + case SqlBaseParser.ADD => + ctx.identifier.getText.toLowerCase match { + case "file" => AddFileCommand(mayebePaths) + case "jar" => AddJarCommand(mayebePaths) + case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx) + } + case SqlBaseParser.LIST => + ctx.identifier.getText.toLowerCase match { + case "files" | "file" => + if (mayebePaths.length > 0) { + ListFilesCommand(mayebePaths.split("\\s+")) + } else { + ListFilesCommand() + } + case "jars" | "jar" => + if (mayebePaths.length > 0) { + ListJarsCommand(mayebePaths.split("\\s+")) + } else { + ListJarsCommand() + } + case other => throw operationNotAllowed(s"LIST with resource type '$other'", ctx) + } + case _ => throw operationNotAllowed(s"Other types of operation on resources", ctx) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala index 162d493..20b0894 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -17,9 +17,14 @@ package org.apache.spark.sql.execution.command +import java.io.File +import java.net.URI + +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} /** * Adds a jar to the current session so it can be used (for UDFs or serdes). @@ -46,3 +51,51 @@ case class AddFileCommand(path: String) extends RunnableCommand { Seq.empty[Row] } } + +/** + * Returns a list of file paths that are added to resources. + * If file paths are provided, return the ones that are added to resources. + */ +case class ListFilesCommand(files: Seq[String] = Seq.empty[String]) extends RunnableCommand { + override val output: Seq[Attribute] = { + AttributeReference("Results", StringType, nullable = false)() :: Nil + } + override def run(sparkSession: SparkSession): Seq[Row] = { + val fileList = sparkSession.sparkContext.listFiles() + if (files.size > 0) { + files.map { f => + val uri = new URI(f) + val schemeCorrectedPath = uri.getScheme match { + case null | "local" => new File(f).getCanonicalFile.toURI.toString + case _ => f + } + new Path(schemeCorrectedPath).toUri.toString + }.collect { + case f if fileList.contains(f) => f + }.map(Row(_)) + } else { + fileList.map(Row(_)) + } + } +} + +/** + * Returns a list of jar files that are added to resources. + * If jar files are provided, return the ones that are added to resources. + */ +case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends RunnableCommand { + override val output: Seq[Attribute] = { + AttributeReference("Results", StringType, nullable = false)() :: Nil + } + override def run(sparkSession: SparkSession): Seq[Row] = { + val jarList = sparkSession.sparkContext.listJars() + if (jars.nonEmpty) { + for { + jarName <- jars.map(f => new Path(f).getName) + jarPath <- jarList if jarPath.contains(jarName) + } yield Row(jarPath) + } else { + jarList.map(Row(_)) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 33ff8ae..7389e18 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -32,8 +32,7 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor} -import org.apache.hadoop.hive.ql.processors.{CommandProcessorFactory, ResetProcessor, SetProcessor} +import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.thrift.transport.TSocket @@ -295,9 +294,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { System.exit(0) } if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") || - cmd_trimmed.startsWith("!") || - tokens(0).toLowerCase.equals("list") || - isRemoteMode) { + cmd_trimmed.startsWith("!") || isRemoteMode) { val start = System.currentTimeMillis() super.processCmd(cmd) val end = System.currentTimeMillis() @@ -312,7 +309,8 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { if (proc != null) { // scalastyle:off println if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor] || - proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ResetProcessor]) { + proc.isInstanceOf[AddResourceProcessor] || proc.isInstanceOf[ListResourceProcessor] || + proc.isInstanceOf[ResetProcessor] ) { val driver = new SparkSQLDriver driver.init() http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/sql/hive-thriftserver/src/test/resources/TestUDTF.jar ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/test/resources/TestUDTF.jar b/sql/hive-thriftserver/src/test/resources/TestUDTF.jar new file mode 100644 index 0000000..514f2d5 Binary files /dev/null and b/sql/hive-thriftserver/src/test/resources/TestUDTF.jar differ http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 3fa2f88..2bf0221 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -238,4 +238,23 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))( "" -> "This is a test for Spark-11624") } + + test("list jars") { + val jarFile = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar") + runCliWithin(2.minute)( + s"ADD JAR $jarFile" -> "", + s"LIST JARS" -> "TestUDTF.jar", + s"List JAR $jarFile" -> "TestUDTF.jar" + ) + } + + test("list files") { + val dataFilePath = Thread.currentThread().getContextClassLoader + .getResource("data/files/small_kv.txt") + runCliWithin(2.minute)( + s"ADD FILE $dataFilePath" -> "", + s"LIST FILES" -> "small_kv.txt", + s"LIST FILE $dataFilePath" -> "small_kv.txt" + ) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/3def5612/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e179021..e0f6ccf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -876,6 +876,13 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE t1""") sql("select * from src join t1 on src.key = t1.a") sql("DROP TABLE t1") + assert(sql("list jars"). + filter(_.getString(0).contains("hive-hcatalog-core-0.13.1.jar")).count() > 0) + assert(sql("list jar"). + filter(_.getString(0).contains("hive-hcatalog-core-0.13.1.jar")).count() > 0) + val testJar2 = TestHive.getHiveFile("TestUDTF.jar").getCanonicalPath + sql(s"ADD JAR $testJar2") + assert(sql(s"list jar $testJar").count() == 1) } test("CREATE TEMPORARY FUNCTION") { @@ -899,6 +906,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } assert(checkAddFileRDD.first()) + assert(sql("list files"). + filter(_.getString(0).contains("data/files/v1.txt")).count() > 0) + assert(sql("list file"). + filter(_.getString(0).contains("data/files/v1.txt")).count() > 0) + assert(sql(s"list file $testFile").count() == 1) } createQueryTest("dynamic_partition", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org