Repository: spark
Updated Branches:
  refs/heads/master 9bbe0171c -> 35ef853b3


[SPARK-9397] DataFrame should provide an API to find source data files if 
applicable

Certain applications would benefit from being able to inspect DataFrames that 
are straightforwardly produced by data sources that stem from files, and find 
out their source data. For example, one might want to display to a user the 
size of the data underlying a table, or to copy or mutate it.

This PR exposes an `inputFiles` method on DataFrame which attempts to discover 
the source data in a best-effort manner, by inspecting HadoopFsRelations and 
JSONRelations.

Author: Aaron Davidson <[email protected]>

Closes #7717 from aarondav/paths and squashes the following commits:

ff67430 [Aaron Davidson] inputFiles
0acd3ad [Aaron Davidson] [SPARK-9397] DataFrame should provide an API to find 
source data files if applicable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35ef853b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35ef853b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35ef853b

Branch: refs/heads/master
Commit: 35ef853b3f9d955949c464e4a0d445147e0e9a07
Parents: 9bbe017
Author: Aaron Davidson <[email protected]>
Authored: Tue Jul 28 10:12:09 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Tue Jul 28 10:12:09 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  | 20 ++++++++++++++++++--
 .../org/apache/spark/sql/DataFrameSuite.scala   | 20 ++++++++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 +++---
 3 files changed, 41 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/35ef853b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 114ab91..3ea0f9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD}
-import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect
-import org.apache.spark.sql.json.JacksonGenerator
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
+import org.apache.spark.sql.json.{JacksonGenerator, JSONRelation}
+import org.apache.spark.sql.sources.HadoopFsRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -1546,6 +1547,21 @@ class DataFrame private[sql](
     }
   }
 
+  /**
+   * Returns a best-effort snapshot of the files that compose this DataFrame. 
This method simply
+   * asks each constituent BaseRelation for its respective files and takes the 
union of all results.
+   * Depending on the source relations, this may not find all input files. 
Duplicates are removed.
+   */
+  def inputFiles: Array[String] = {
+    val files: Seq[String] = logicalPlan.collect {
+      case LogicalRelation(fsBasedRelation: HadoopFsRelation) =>
+        fsBasedRelation.paths.toSeq
+      case LogicalRelation(jsonRelation: JSONRelation) =>
+        jsonRelation.path.toSeq
+    }.flatten
+    files.toSet.toArray
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   // for Python API
   ////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/spark/blob/35ef853b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f67f2c6..3151e07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -23,7 +23,10 @@ import scala.language.postfixOps
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.json.JSONRelation
+import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
 
@@ -491,6 +494,23 @@ class DataFrameSuite extends QueryTest with SQLTestUtils {
     checkAnswer(df.select(df("key")), testData.select('key).collect().toSeq)
   }
 
+  test("inputFiles") {
+    val fakeRelation1 = new ParquetRelation(Array("/my/path", 
"/my/other/path"),
+      Some(testData.schema), None, Map.empty)(sqlContext)
+    val df1 = DataFrame(sqlContext, LogicalRelation(fakeRelation1))
+    assert(df1.inputFiles.toSet == fakeRelation1.paths.toSet)
+
+    val fakeRelation2 = new JSONRelation("/json/path", 1, 
Some(testData.schema), sqlContext)
+    val df2 = DataFrame(sqlContext, LogicalRelation(fakeRelation2))
+    assert(df2.inputFiles.toSet == fakeRelation2.path.toSet)
+
+    val unionDF = df1.unionAll(df2)
+    assert(unionDF.inputFiles.toSet == fakeRelation1.paths.toSet ++ 
fakeRelation2.path)
+
+    val filtered = df1.filter("false").unionAll(df2.intersect(df2))
+    assert(filtered.inputFiles.toSet == fakeRelation1.paths.toSet ++ 
fakeRelation2.path)
+  }
+
   ignore("show") {
     // This test case is intended ignored, but to make sure it compiles 
correctly
     testData.select($"*").show()

http://git-wip-us.apache.org/repos/asf/spark/blob/35ef853b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 3180c05..a8c9b4f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -274,9 +274,9 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
     val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
     val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
 
-    // NOTE: Instead of passing Metastore schema directly to 
`ParquetRelation2`, we have to
+    // NOTE: Instead of passing Metastore schema directly to 
`ParquetRelation`, we have to
     // serialize the Metastore schema to JSON and pass it as a data source 
option because of the
-    // evil case insensitivity issue, which is reconciled within 
`ParquetRelation2`.
+    // evil case insensitivity issue, which is reconciled within 
`ParquetRelation`.
     val parquetOptions = Map(
       ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
       ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
@@ -290,7 +290,7 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical@LogicalRelation(parquetRelation: ParquetRelation) =>
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to