[SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames

- The old implicit would convert RDDs directly to DataFrames, and that added 
too many methods.
- toDataFrame -> toDF
- Dsl -> functions
- implicits moved into SQLContext.implicits
- addColumn -> withColumn
- renameColumn -> withColumnRenamed

Python changes:
- toDataFrame -> toDF
- Dsl -> functions package
- addColumn -> withColumn
- renameColumn -> withColumnRenamed
- add toDF functions to RDD on SQLContext init
- add flatMap to DataFrame

Author: Reynold Xin <r...@databricks.com>
Author: Davies Liu <dav...@databricks.com>

Closes #4556 from rxin/SPARK-5752 and squashes the following commits:

5ef9910 [Reynold Xin] More fix
61d3fca [Reynold Xin] Merge branch 'df5' of github.com:davies/spark into 
SPARK-5752
ff5832c [Reynold Xin] Fix python
749c675 [Reynold Xin] count(*) fixes.
5806df0 [Reynold Xin] Fix build break again.
d941f3d [Reynold Xin] Fixed explode compilation break.
fe1267a [Davies Liu] flatMap
c4afb8e [Reynold Xin] style
d9de47f [Davies Liu] add comment
b783994 [Davies Liu] add comment for toDF
e2154e5 [Davies Liu] schema() -> schema
3a1004f [Davies Liu] Dsl -> functions, toDF()
fb256af [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits 
moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> 
withColumnRenamed
0dd74eb [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly 
to DataFrames
97dd47c [Davies Liu] fix mistake
6168f74 [Davies Liu] fix test
1fc0199 [Davies Liu] fix test
a075cd5 [Davies Liu] clean up, toPandas
663d314 [Davies Liu] add test for agg('*')
9e214d5 [Reynold Xin] count(*) fixes.
1ed7136 [Reynold Xin] Fix build break again.
921b2e3 [Reynold Xin] Fixed explode compilation break.
14698d4 [Davies Liu] flatMap
ba3e12d [Reynold Xin] style
d08c92d [Davies Liu] add comment
5c8b524 [Davies Liu] add comment for toDF
a4e5e66 [Davies Liu] schema() -> schema
d377fc9 [Davies Liu] Dsl -> functions, toDF()
6b3086c [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits 
moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> 
withColumnRenamed
807e8b1 [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly 
to DataFrames


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e98dfe62
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e98dfe62
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e98dfe62

Branch: refs/heads/master
Commit: e98dfe627c5d0201464cdd0f363f391ea84c389a
Parents: 0ce4e43
Author: Reynold Xin <r...@databricks.com>
Authored: Fri Feb 13 23:03:22 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Feb 13 23:03:22 2015 -0800

----------------------------------------------------------------------
 .../examples/ml/CrossValidatorExample.scala     |   4 +-
 .../spark/examples/ml/DeveloperApiExample.scala |   4 +-
 .../apache/spark/examples/ml/MovieLensALS.scala |   6 +-
 .../spark/examples/ml/SimpleParamsExample.scala |   6 +-
 .../ml/SimpleTextClassificationPipeline.scala   |   4 +-
 .../spark/examples/mllib/DatasetExample.scala   |   8 +-
 .../apache/spark/examples/sql/RDDRelation.scala |  10 +-
 .../spark/examples/sql/hive/HiveFromSpark.scala |   2 +-
 .../scala/org/apache/spark/ml/Transformer.scala |   6 +-
 .../spark/ml/classification/Classifier.scala    |  16 +-
 .../ml/classification/LogisticRegression.scala  |  33 +-
 .../ProbabilisticClassifier.scala               |   6 +-
 .../spark/ml/feature/StandardScaler.scala       |   4 +-
 .../spark/ml/impl/estimator/Predictor.scala     |   4 +-
 .../apache/spark/ml/recommendation/ALS.scala    |   6 +-
 .../spark/mllib/classification/NaiveBayes.scala |   2 +-
 .../impl/GLMClassificationModel.scala           |   2 +-
 .../MatrixFactorizationModel.scala              |   4 +-
 .../regression/impl/GLMRegressionModel.scala    |   2 +-
 .../mllib/tree/model/DecisionTreeModel.scala    |   2 +-
 .../mllib/tree/model/treeEnsembleModels.scala   |   2 +-
 .../spark/ml/recommendation/ALSSuite.scala      |   4 +-
 python/docs/pyspark.sql.rst                     |   8 +
 python/pyspark/mllib/tests.py                   |   2 +-
 python/pyspark/sql/__init__.py                  |   3 +-
 python/pyspark/sql/context.py                   |  34 +-
 python/pyspark/sql/dataframe.py                 | 221 +++-------
 python/pyspark/sql/functions.py                 | 170 ++++++++
 python/pyspark/sql/tests.py                     |  38 +-
 python/run-tests                                |   3 +-
 .../org/apache/spark/repl/SparkILoopInit.scala  |   2 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala |   2 +-
 .../org/apache/spark/repl/SparkILoop.scala      |   2 +-
 .../sql/catalyst/analysis/unresolved.scala      |   2 +-
 .../scala/org/apache/spark/sql/Column.scala     |  21 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  25 +-
 .../org/apache/spark/sql/DataFrameHolder.scala  |  30 ++
 .../org/apache/spark/sql/DataFrameImpl.scala    |   6 +-
 .../main/scala/org/apache/spark/sql/Dsl.scala   | 428 -------------------
 .../org/apache/spark/sql/GroupedData.scala      |  19 +-
 .../apache/spark/sql/IncomputableColumn.scala   |   6 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  35 +-
 .../apache/spark/sql/UserDefinedFunction.scala  |   4 +-
 .../scala/org/apache/spark/sql/functions.scala  | 425 ++++++++++++++++++
 .../apache/spark/sql/parquet/ParquetTest.scala  |   2 +-
 .../org/apache/spark/sql/api/java/JavaDsl.java  |   2 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   7 +-
 .../spark/sql/ColumnExpressionSuite.scala       |  10 +-
 .../spark/sql/DataFrameImplicitsSuite.scala     |  10 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  51 ++-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |   3 +-
 .../org/apache/spark/sql/ListTablesSuite.scala  |   2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |   8 +-
 .../sql/ScalaReflectionRelationSuite.scala      |  10 +-
 .../scala/org/apache/spark/sql/TestData.scala   |  46 +-
 .../scala/org/apache/spark/sql/UDFSuite.scala   |   2 +-
 .../apache/spark/sql/UserDefinedTypeSuite.scala |   4 +-
 .../columnar/InMemoryColumnarQuerySuite.scala   |   8 +-
 .../columnar/PartitionBatchPruningSuite.scala   |   5 +-
 .../spark/sql/execution/PlannerSuite.scala      |   3 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   |   7 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |   6 +-
 .../sql/hive/InsertIntoHiveTableSuite.scala     |   8 +-
 .../apache/spark/sql/hive/ListTablesSuite.scala |   2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  17 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  12 +-
 .../hive/execution/HiveResolutionSuite.scala    |   6 +-
 .../sql/hive/execution/HiveTableScanSuite.scala |   3 +-
 .../spark/sql/hive/execution/HiveUdfSuite.scala |  10 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  11 +-
 .../spark/sql/parquet/parquetSuites.scala       |   6 +-
 71 files changed, 1012 insertions(+), 872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
index a2893f7..f024194 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
@@ -90,7 +90,7 @@ object CrossValidatorExample {
     crossval.setNumFolds(2) // Use 3+ in practice
 
     // Run cross-validation, and choose the best set of parameters.
-    val cvModel = crossval.fit(training)
+    val cvModel = crossval.fit(training.toDF)
 
     // Prepare test documents, which are unlabeled.
     val test = sc.parallelize(Seq(
@@ -100,7 +100,7 @@ object CrossValidatorExample {
       Document(7L, "apache hadoop")))
 
     // Make predictions on test documents. cvModel uses the best model found 
(lrModel).
-    cvModel.transform(test)
+    cvModel.transform(test.toDF)
       .select("id", "text", "probability", "prediction")
       .collect()
       .foreach { case Row(id: Long, text: String, prob: Vector, prediction: 
Double) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
index aed4423..54aadd2 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala
@@ -58,7 +58,7 @@ object DeveloperApiExample {
     lr.setMaxIter(10)
 
     // Learn a LogisticRegression model.  This uses the parameters stored in 
lr.
-    val model = lr.fit(training)
+    val model = lr.fit(training.toDF)
 
     // Prepare test data.
     val test = sc.parallelize(Seq(
@@ -67,7 +67,7 @@ object DeveloperApiExample {
       LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
 
     // Make predictions on test data.
-    val sumPredictions: Double = model.transform(test)
+    val sumPredictions: Double = model.transform(test.toDF)
       .select("features", "label", "prediction")
       .collect()
       .map { case Row(features: Vector, label: Double, prediction: Double) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala 
b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
index 836ea2e..adaf796 100644
--- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala
@@ -137,9 +137,9 @@ object MovieLensALS {
       .setRegParam(params.regParam)
       .setNumBlocks(params.numBlocks)
 
-    val model = als.fit(training)
+    val model = als.fit(training.toDF)
 
-    val predictions = model.transform(test).cache()
+    val predictions = model.transform(test.toDF).cache()
 
     // Evaluate the model.
     // TODO: Create an evaluator to compute RMSE.
@@ -158,7 +158,7 @@ object MovieLensALS {
 
     // Inspect false positives.
     predictions.registerTempTable("prediction")
-    sc.textFile(params.movies).map(Movie.parseMovie).registerTempTable("movie")
+    
sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie")
     sqlContext.sql(
       """
         |SELECT userId, prediction.movieId, title, rating, prediction

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
index 80c9f5f..c5bb551 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala
@@ -58,7 +58,7 @@ object SimpleParamsExample {
       .setRegParam(0.01)
 
     // Learn a LogisticRegression model.  This uses the parameters stored in 
lr.
-    val model1 = lr.fit(training)
+    val model1 = lr.fit(training.toDF)
     // Since model1 is a Model (i.e., a Transformer produced by an Estimator),
     // we can view the parameters it used during fit().
     // This prints the parameter (name: value) pairs, where names are unique 
IDs for this
@@ -77,7 +77,7 @@ object SimpleParamsExample {
 
     // Now learn a new model using the paramMapCombined parameters.
     // paramMapCombined overrides all parameters set earlier via lr.set* 
methods.
-    val model2 = lr.fit(training, paramMapCombined)
+    val model2 = lr.fit(training.toDF, paramMapCombined)
     println("Model 2 was fit using parameters: " + model2.fittingParamMap)
 
     // Prepare test data.
@@ -90,7 +90,7 @@ object SimpleParamsExample {
     // LogisticRegression.transform will only use the 'features' column.
     // Note that model2.transform() outputs a 'myProbability' column instead 
of the usual
     // 'probability' column since we renamed the lr.probabilityCol parameter 
previously.
-    model2.transform(test)
+    model2.transform(test.toDF)
       .select("features", "label", "myProbability", "prediction")
       .collect()
       .foreach { case Row(features: Vector, label: Double, prob: Vector, 
prediction: Double) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
 
b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
index 968cb29..8b47f88 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala
@@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline {
       .setStages(Array(tokenizer, hashingTF, lr))
 
     // Fit the pipeline to training documents.
-    val model = pipeline.fit(training)
+    val model = pipeline.fit(training.toDF)
 
     // Prepare test documents, which are unlabeled.
     val test = sc.parallelize(Seq(
@@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline {
       Document(7L, "apache hadoop")))
 
     // Make predictions on test documents.
-    model.transform(test)
+    model.transform(test.toDF)
       .select("id", "text", "probability", "prediction")
       .collect()
       .foreach { case Row(id: Long, text: String, prob: Vector, prediction: 
Double) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala 
b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
index 89b6255..c98c68a 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
@@ -81,18 +81,18 @@ object DatasetExample {
     println(s"Loaded ${origData.count()} instances from file: ${params.input}")
 
     // Convert input data to DataFrame explicitly.
-    val df: DataFrame = origData.toDataFrame
+    val df: DataFrame = origData.toDF
     println(s"Inferred schema:\n${df.schema.prettyJson}")
     println(s"Converted to DataFrame with ${df.count()} records")
 
-    // Select columns, using implicit conversion to DataFrames.
-    val labelsDf: DataFrame = origData.select("label")
+    // Select columns
+    val labelsDf: DataFrame = df.select("label")
     val labels: RDD[Double] = labelsDf.map { case Row(v: Double) => v }
     val numLabels = labels.count()
     val meanLabel = labels.fold(0.0)(_ + _) / numLabels
     println(s"Selected label column with average value $meanLabel")
 
-    val featuresDf: DataFrame = origData.select("features")
+    val featuresDf: DataFrame = df.select("features")
     val features: RDD[Vector] = featuresDf.map { case Row(v: Vector) => v }
     val featureSummary = features.aggregate(new 
MultivariateOnlineSummarizer())(
       (summary, feat) => summary.add(feat),

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala 
b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index 1eac3c8..79d3d5a 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.examples.sql
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 
 // One method for defining the schema of an RDD is to make a case class with 
the desired column
 // names and types.
@@ -34,10 +34,10 @@ object RDDRelation {
     // Importing the SQL context gives access to all the SQL functions and 
implicit conversions.
     import sqlContext.implicits._
 
-    val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
+    val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF
     // Any RDD containing case classes can be registered as a table.  The 
schema of the table is
     // automatically inferred using scala reflection.
-    rdd.registerTempTable("records")
+    df.registerTempTable("records")
 
     // Once tables have been registered, you can run SQL queries over them.
     println("Result of SELECT *:")
@@ -55,10 +55,10 @@ object RDDRelation {
     rddFromSql.map(row => s"Key: ${row(0)}, Value: 
${row(1)}").collect().foreach(println)
 
     // Queries can also be written using a LINQ-like Scala DSL.
-    rdd.where($"key" === 
1).orderBy($"value".asc).select($"key").collect().foreach(println)
+    df.where($"key" === 
1).orderBy($"value".asc).select($"key").collect().foreach(println)
 
     // Write out an RDD as a parquet file.
-    rdd.saveAsParquetFile("pair.parquet")
+    df.saveAsParquetFile("pair.parquet")
 
     // Read in parquet file.  Parquet files are self-describing so the schmema 
is preserved.
     val parquetFile = sqlContext.parquetFile("pair.parquet")

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
index 15754cd..7128deb 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala
@@ -68,7 +68,7 @@ object HiveFromSpark {
 
     // You can also register RDDs as temporary tables within a HiveContext.
     val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
-    rdd.registerTempTable("records")
+    rdd.toDF.registerTempTable("records")
 
     // Queries can then join RDD data with data stored in Hive.
     println("Result of SELECT *:")

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
index 2ec2ccd..9a58486 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
 import org.apache.spark.annotation.AlphaComponent
 import org.apache.spark.ml.param._
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
 /**
@@ -100,7 +100,7 @@ private[ml] abstract class UnaryTransformer[IN, OUT, T <: 
UnaryTransformer[IN, O
   override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
     transformSchema(dataset.schema, paramMap, logging = true)
     val map = this.paramMap ++ paramMap
-    dataset.select($"*", callUDF(
-      this.createTransformFunc(map), outputDataType, 
dataset(map(inputCol))).as(map(outputCol)))
+    dataset.withColumn(map(outputCol),
+      callUDF(this.createTransformFunc(map), outputDataType, 
dataset(map(inputCol))))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
index 124ab30..c5fc89f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.{DeveloperApi, 
AlphaComponent}
 import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, 
PredictorParams}
 import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol}
 import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.{DataType, DoubleType, StructType}
 
@@ -182,24 +182,22 @@ private[ml] object ClassificationModel {
     if (map(model.rawPredictionCol) != "") {
       // output raw prediction
       val features2raw: FeaturesType => Vector = model.predictRaw
-      tmpData = tmpData.select($"*",
-        callUDF(features2raw, new VectorUDT,
-          col(map(model.featuresCol))).as(map(model.rawPredictionCol)))
+      tmpData = tmpData.withColumn(map(model.rawPredictionCol),
+        callUDF(features2raw, new VectorUDT, col(map(model.featuresCol))))
       numColsOutput += 1
       if (map(model.predictionCol) != "") {
         val raw2pred: Vector => Double = (rawPred) => {
           rawPred.toArray.zipWithIndex.maxBy(_._1)._2
         }
-        tmpData = tmpData.select($"*", callUDF(raw2pred, DoubleType,
-          col(map(model.rawPredictionCol))).as(map(model.predictionCol)))
+        tmpData = tmpData.withColumn(map(model.predictionCol),
+          callUDF(raw2pred, DoubleType, col(map(model.rawPredictionCol))))
         numColsOutput += 1
       }
     } else if (map(model.predictionCol) != "") {
       // output prediction
       val features2pred: FeaturesType => Double = model.predict
-      tmpData = tmpData.select($"*",
-        callUDF(features2pred, DoubleType,
-          col(map(model.featuresCol))).as(map(model.predictionCol)))
+      tmpData = tmpData.withColumn(map(model.predictionCol),
+        callUDF(features2pred, DoubleType, col(map(model.featuresCol))))
       numColsOutput += 1
     }
     (numColsOutput, tmpData)

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index a9a5af5..21f61d8 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -22,7 +22,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
 import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors}
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.DoubleType
 import org.apache.spark.storage.StorageLevel
 
@@ -130,44 +130,39 @@ class LogisticRegressionModel private[ml] (
     var numColsOutput = 0
     if (map(rawPredictionCol) != "") {
       val features2raw: Vector => Vector = (features) => predictRaw(features)
-      tmpData = tmpData.select($"*",
-        callUDF(features2raw, new VectorUDT, 
col(map(featuresCol))).as(map(rawPredictionCol)))
+      tmpData = tmpData.withColumn(map(rawPredictionCol),
+        callUDF(features2raw, new VectorUDT, col(map(featuresCol))))
       numColsOutput += 1
     }
     if (map(probabilityCol) != "") {
       if (map(rawPredictionCol) != "") {
-        val raw2prob: Vector => Vector = { (rawPreds: Vector) =>
+        val raw2prob = udf { (rawPreds: Vector) =>
           val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
-          Vectors.dense(1.0 - prob1, prob1)
+          Vectors.dense(1.0 - prob1, prob1): Vector
         }
-        tmpData = tmpData.select($"*",
-          callUDF(raw2prob, new VectorUDT, 
col(map(rawPredictionCol))).as(map(probabilityCol)))
+        tmpData = tmpData.withColumn(map(probabilityCol), 
raw2prob(col(map(rawPredictionCol))))
       } else {
-        val features2prob: Vector => Vector = (features: Vector) => 
predictProbabilities(features)
-        tmpData = tmpData.select($"*",
-          callUDF(features2prob, new VectorUDT, 
col(map(featuresCol))).as(map(probabilityCol)))
+        val features2prob = udf { (features: Vector) => 
predictProbabilities(features) : Vector }
+        tmpData = tmpData.withColumn(map(probabilityCol), 
features2prob(col(map(featuresCol))))
       }
       numColsOutput += 1
     }
     if (map(predictionCol) != "") {
       val t = map(threshold)
       if (map(probabilityCol) != "") {
-        val predict: Vector => Double = { probs: Vector =>
+        val predict = udf { probs: Vector =>
           if (probs(1) > t) 1.0 else 0.0
         }
-        tmpData = tmpData.select($"*",
-          callUDF(predict, DoubleType, 
col(map(probabilityCol))).as(map(predictionCol)))
+        tmpData = tmpData.withColumn(map(predictionCol), 
predict(col(map(probabilityCol))))
       } else if (map(rawPredictionCol) != "") {
-        val predict: Vector => Double = { rawPreds: Vector =>
+        val predict = udf { rawPreds: Vector =>
           val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
           if (prob1 > t) 1.0 else 0.0
         }
-        tmpData = tmpData.select($"*",
-          callUDF(predict, DoubleType, 
col(map(rawPredictionCol))).as(map(predictionCol)))
+        tmpData = tmpData.withColumn(map(predictionCol), 
predict(col(map(rawPredictionCol))))
       } else {
-        val predict: Vector => Double = (features: Vector) => 
this.predict(features)
-        tmpData = tmpData.select($"*",
-          callUDF(predict, DoubleType, 
col(map(featuresCol))).as(map(predictionCol)))
+        val predict = udf { features: Vector => this.predict(features) }
+        tmpData = tmpData.withColumn(map(predictionCol), 
predict(col(map(featuresCol))))
       }
       numColsOutput += 1
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
index 3851878..bd8caac 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.{AlphaComponent, 
DeveloperApi}
 import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params}
 import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataType, StructType}
 
 
@@ -122,8 +122,8 @@ private[spark] abstract class 
ProbabilisticClassificationModel[
       val features2probs: FeaturesType => Vector = (features) => {
         tmpModel.predictProbabilities(features)
       }
-      outputData.select($"*",
-        callUDF(features2probs, new VectorUDT, 
col(map(featuresCol))).as(map(probabilityCol)))
+      outputData.withColumn(map(probabilityCol),
+        callUDF(features2probs, new VectorUDT, col(map(featuresCol))))
     } else {
       if (numColsOutput == 0) {
         this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() 
was called as NOOP" +

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index 7623ec5..ddbd648 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -23,7 +23,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.mllib.feature
 import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
 import org.apache.spark.sql._
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
@@ -88,7 +88,7 @@ class StandardScalerModel private[ml] (
     transformSchema(dataset.schema, paramMap, logging = true)
     val map = this.paramMap ++ paramMap
     val scale = udf((v: Vector) => { scaler.transform(v) } : Vector)
-    dataset.select($"*", scale(col(map(inputCol))).as(map(outputCol)))
+    dataset.withColumn(map(outputCol), scale(col(map(inputCol))))
   }
 
   private[ml] override def transformSchema(schema: StructType, paramMap: 
ParamMap): StructType = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala 
b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala
index e416c1e..7daeff9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala
@@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataType, DoubleType, StructType}
 
 
@@ -216,7 +216,7 @@ private[spark] abstract class PredictionModel[FeaturesType, 
M <: PredictionModel
       val pred: FeaturesType => Double = (features) => {
         tmpModel.predict(features)
       }
-      dataset.select($"*", callUDF(pred, DoubleType, 
col(map(featuresCol))).as(map(predictionCol)))
+      dataset.withColumn(map(predictionCol), callUDF(pred, DoubleType, 
col(map(featuresCol))))
     } else {
       this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
         " since no output columns were set.")

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index aac4877..8d70e43 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -36,7 +36,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.mllib.optimization.NNLS
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.Dsl._
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, 
StructField, StructType}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -170,8 +170,8 @@ class ALSModel private[ml] (
   override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
     import dataset.sqlContext.implicits._
     val map = this.paramMap ++ paramMap
-    val users = userFactors.toDataFrame("id", "features")
-    val items = itemFactors.toDataFrame("id", "features")
+    val users = userFactors.toDF("id", "features")
+    val items = itemFactors.toDF("id", "features")
 
     // Register a UDF for DataFrame, and then
     // create a new column named map(predictionCol) by running the predict UDF.

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index f9142bc..dd7a946 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -102,7 +102,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
 
       // Create Parquet data.
-      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1)
+      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
       dataRDD.saveAsParquetFile(dataPath(path))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
index 1d11896..0a358f2 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
@@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {
 
       // Create Parquet data.
       val data = Data(weights, intercept, threshold)
-      sc.parallelize(Seq(data), 1).saveAsParquetFile(Loader.dataPath(path))
+      sc.parallelize(Seq(data), 
1).toDF.saveAsParquetFile(Loader.dataPath(path))
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index a3a3b5d..c399496 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -187,8 +187,8 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ 
("rank" -> model.rank)))
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
-      model.userFeatures.toDataFrame("id", 
"features").saveAsParquetFile(userPath(path))
-      model.productFeatures.toDataFrame("id", 
"features").saveAsParquetFile(productPath(path))
+      model.userFeatures.toDF("id", 
"features").saveAsParquetFile(userPath(path))
+      model.productFeatures.toDF("id", 
"features").saveAsParquetFile(productPath(path))
     }
 
     def load(sc: SparkContext, path: String): MatrixFactorizationModel = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
index f75de6f..7b27aaa 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
@@ -58,7 +58,7 @@ private[regression] object GLMRegressionModel {
 
       // Create Parquet data.
       val data = Data(weights, intercept)
-      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1)
+      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
       // TODO: repartition with 1 partition after SPARK-5532 gets fixed
       dataRDD.saveAsParquetFile(Loader.dataPath(path))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index 373192a..5dac62b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -197,7 +197,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] {
       val nodes = model.topNode.subtreeIterator.toSeq
       val dataRDD: DataFrame = sc.parallelize(nodes)
         .map(NodeData.apply(0, _))
-        .toDataFrame
+        .toDF
       dataRDD.saveAsParquetFile(Loader.dataPath(path))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index dbd69dc..e507f24 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -289,7 +289,7 @@ private[tree] object TreeEnsembleModel {
       // Create Parquet data.
       val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case 
(tree, treeId) =>
         tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
-      }.toDataFrame
+      }.toDF
       dataRDD.saveAsParquetFile(Loader.dataPath(path))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index cb7d57d..b118a8d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -358,8 +358,8 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext 
with Logging {
       .setNumUserBlocks(numUserBlocks)
       .setNumItemBlocks(numItemBlocks)
     val alpha = als.getAlpha
-    val model = als.fit(training)
-    val predictions = model.transform(test)
+    val model = als.fit(training.toDF)
+    val predictions = model.transform(test.toDF)
       .select("rating", "prediction")
       .map { case Row(rating: Float, prediction: Float) =>
         (rating.toDouble, prediction.toDouble)

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/docs/pyspark.sql.rst
----------------------------------------------------------------------
diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst
index 80c6f02..e03379e 100644
--- a/python/docs/pyspark.sql.rst
+++ b/python/docs/pyspark.sql.rst
@@ -16,3 +16,11 @@ pyspark.sql.types module
     :members:
     :undoc-members:
     :show-inheritance:
+
+
+pyspark.sql.functions module
+------------------------
+.. automodule:: pyspark.sql.functions
+    :members:
+    :undoc-members:
+    :show-inheritance:

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 49e5c9d..06207a0 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -335,7 +335,7 @@ class VectorUDTTests(PySparkTestCase):
         sqlCtx = SQLContext(self.sc)
         rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), 
LabeledPoint(0.0, self.sv1)])
         srdd = sqlCtx.inferSchema(rdd)
-        schema = srdd.schema()
+        schema = srdd.schema
         field = [f for f in schema.fields if f.name == "features"][0]
         self.assertEqual(field.dataType, self.udt)
         vectors = srdd.map(lambda p: p.features).collect()

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/pyspark/sql/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 0a5ba00..b9ffd69 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -34,9 +34,8 @@ public classes of Spark SQL:
 
 from pyspark.sql.context import SQLContext, HiveContext
 from pyspark.sql.types import Row
-from pyspark.sql.dataframe import DataFrame, GroupedData, Column, Dsl, 
SchemaRDD
+from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD
 
 __all__ = [
     'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row',
-    'Dsl',
 ]

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 082f1b6..7683c1b 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -38,6 +38,25 @@ except ImportError:
 __all__ = ["SQLContext", "HiveContext"]
 
 
+def _monkey_patch_RDD(sqlCtx):
+    def toDF(self, schema=None, sampleRatio=None):
+        """
+        Convert current :class:`RDD` into a :class:`DataFrame`
+
+        This is a shorthand for `sqlCtx.createDataFrame(rdd, schema, 
sampleRatio)`
+
+        :param schema: a StructType or list of names of columns
+        :param samplingRatio: the sample ratio of rows used for inferring
+        :return: a DataFrame
+
+        >>> rdd.toDF().collect()
+        [Row(name=u'Alice', age=1)]
+        """
+        return sqlCtx.createDataFrame(self, schema, sampleRatio)
+
+    RDD.toDF = toDF
+
+
 class SQLContext(object):
 
     """Main entry point for Spark SQL functionality.
@@ -49,15 +68,20 @@ class SQLContext(object):
     def __init__(self, sparkContext, sqlContext=None):
         """Create a new SQLContext.
 
+        It will add a method called `toDF` to :class:`RDD`, which could be
+        used to convert an RDD into a DataFrame, it's a shorthand for
+        :func:`SQLContext.createDataFrame`.
+
         :param sparkContext: The SparkContext to wrap.
         :param sqlContext: An optional JVM Scala SQLContext. If set, we do not 
instatiate a new
         SQLContext in the JVM, instead we make all calls to this object.
 
         >>> from datetime import datetime
+        >>> sqlCtx = SQLContext(sc)
         >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1L,
         ...     b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
         ...     time=datetime(2014, 8, 1, 14, 1, 5))])
-        >>> df = sqlCtx.createDataFrame(allTypes)
+        >>> df = allTypes.toDF()
         >>> df.registerTempTable("allTypes")
         >>> sqlCtx.sql('select i+1, d+1, not b, list[1], dict["s"], time, 
row.a '
         ...            'from allTypes where b and i > 0').collect()
@@ -70,6 +94,7 @@ class SQLContext(object):
         self._jsc = self._sc._jsc
         self._jvm = self._sc._jvm
         self._scala_SQLContext = sqlContext
+        _monkey_patch_RDD(self)
 
     @property
     def _ssql_ctx(self):
@@ -442,7 +467,7 @@ class SQLContext(object):
         Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')])
         Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
 
-        >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema())
+        >>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
         >>> sqlCtx.registerRDDAsTable(df3, "table2")
         >>> df4 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
@@ -495,7 +520,7 @@ class SQLContext(object):
         Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')])
         Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
 
-        >>> df3 = sqlCtx.jsonRDD(json, df1.schema())
+        >>> df3 = sqlCtx.jsonRDD(json, df1.schema)
         >>> sqlCtx.registerRDDAsTable(df3, "table2")
         >>> df4 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, "
@@ -800,7 +825,8 @@ def _test():
          Row(field1=2, field2="row2"),
          Row(field1=3, field2="row3")]
     )
-    globs['df'] = sqlCtx.createDataFrame(rdd)
+    _monkey_patch_RDD(sqlCtx)
+    globs['df'] = rdd.toDF()
     jsonStrings = [
         '{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
         '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index b6f052e..1438fe5 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -21,21 +21,19 @@ import warnings
 import random
 import os
 from tempfile import NamedTemporaryFile
-from itertools import imap
 
 from py4j.java_collections import ListConverter, MapConverter
 
 from pyspark.context import SparkContext
-from pyspark.rdd import RDD, _prepare_for_python_RDD
-from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, 
PickleSerializer, \
-    UTF8Deserializer
+from pyspark.rdd import RDD
+from pyspark.serializers import BatchedSerializer, PickleSerializer, 
UTF8Deserializer
 from pyspark.storagelevel import StorageLevel
 from pyspark.traceback_utils import SCCallSiteSync
 from pyspark.sql.types import *
 from pyspark.sql.types import _create_cls, _parse_datatype_json_string
 
 
-__all__ = ["DataFrame", "GroupedData", "Column", "Dsl", "SchemaRDD"]
+__all__ = ["DataFrame", "GroupedData", "Column", "SchemaRDD"]
 
 
 class DataFrame(object):
@@ -76,6 +74,7 @@ class DataFrame(object):
         self.sql_ctx = sql_ctx
         self._sc = sql_ctx and sql_ctx._sc
         self.is_cached = False
+        self._schema = None  # initialized lazily
 
     @property
     def rdd(self):
@@ -86,7 +85,7 @@ class DataFrame(object):
         if not hasattr(self, '_lazy_rdd'):
             jrdd = self._jdf.javaToPython()
             rdd = RDD(jrdd, self.sql_ctx._sc, 
BatchedSerializer(PickleSerializer()))
-            schema = self.schema()
+            schema = self.schema
 
             def applySchema(it):
                 cls = _create_cls(schema)
@@ -216,14 +215,17 @@ class DataFrame(object):
                                           self._sc._gateway._gateway_client)
         self._jdf.save(source, jmode, joptions)
 
+    @property
     def schema(self):
         """Returns the schema of this DataFrame (represented by
         a L{StructType}).
 
-        >>> df.schema()
+        >>> df.schema
         
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
         """
-        return _parse_datatype_json_string(self._jdf.schema().json())
+        if self._schema is None:
+            self._schema = 
_parse_datatype_json_string(self._jdf.schema().json())
+        return self._schema
 
     def printSchema(self):
         """Prints out the schema in the tree format.
@@ -284,7 +286,7 @@ class DataFrame(object):
         with open(tempFile.name, 'rb') as tempFile:
             rs = 
list(BatchedSerializer(PickleSerializer()).load_stream(tempFile))
         os.unlink(tempFile.name)
-        cls = _create_cls(self.schema())
+        cls = _create_cls(self.schema)
         return [cls(r) for r in rs]
 
     def limit(self, num):
@@ -310,14 +312,26 @@ class DataFrame(object):
         return self.limit(num).collect()
 
     def map(self, f):
-        """ Return a new RDD by applying a function to each Row, it's a
-        shorthand for df.rdd.map()
+        """ Return a new RDD by applying a function to each Row
+
+        It's a shorthand for df.rdd.map()
 
         >>> df.map(lambda p: p.name).collect()
         [u'Alice', u'Bob']
         """
         return self.rdd.map(f)
 
+    def flatMap(self, f):
+        """ Return a new RDD by first applying a function to all elements of 
this,
+        and then flattening the results.
+
+        It's a shorthand for df.rdd.flatMap()
+
+        >>> df.flatMap(lambda p: p.name).collect()
+        [u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']
+        """
+        return self.rdd.flatMap(f)
+
     def mapPartitions(self, f, preservesPartitioning=False):
         """
         Return a new RDD by applying a function to each partition.
@@ -378,21 +392,6 @@ class DataFrame(object):
         rdd = self._jdf.sample(withReplacement, fraction, long(seed))
         return DataFrame(rdd, self.sql_ctx)
 
-    # def takeSample(self, withReplacement, num, seed=None):
-    #     """Return a fixed-size sampled subset of this DataFrame.
-    #
-    #     >>> df = sqlCtx.inferSchema(rdd)
-    #     >>> df.takeSample(False, 2, 97)
-    #     [Row(field1=3, field2=u'row3'), Row(field1=1, field2=u'row1')]
-    #     """
-    #     seed = seed if seed is not None else random.randint(0, sys.maxint)
-    #     with SCCallSiteSync(self.context) as css:
-    #         bytesInJava = self._jdf \
-    #             .takeSampleToPython(withReplacement, num, long(seed)) \
-    #             .iterator()
-    #     cls = _create_cls(self.schema())
-    #     return map(cls, self._collect_iterator_through_file(bytesInJava))
-
     @property
     def dtypes(self):
         """Return all column names and their data types as a list.
@@ -400,7 +399,7 @@ class DataFrame(object):
         >>> df.dtypes
         [('age', 'int'), ('name', 'string')]
         """
-        return [(str(f.name), f.dataType.simpleString()) for f in 
self.schema().fields]
+        return [(str(f.name), f.dataType.simpleString()) for f in 
self.schema.fields]
 
     @property
     def columns(self):
@@ -409,7 +408,7 @@ class DataFrame(object):
         >>> df.columns
         [u'age', u'name']
         """
-        return [f.name for f in self.schema().fields]
+        return [f.name for f in self.schema.fields]
 
     def join(self, other, joinExprs=None, joinType=None):
         """
@@ -586,8 +585,8 @@ class DataFrame(object):
 
         >>> df.agg({"age": "max"}).collect()
         [Row(MAX(age#0)=5)]
-        >>> from pyspark.sql import Dsl
-        >>> df.agg(Dsl.min(df.age)).collect()
+        >>> from pyspark.sql import functions as F
+        >>> df.agg(F.min(df.age)).collect()
         [Row(MIN(age#0)=2)]
         """
         return self.groupBy().agg(*exprs)
@@ -616,18 +615,18 @@ class DataFrame(object):
         """
         return DataFrame(getattr(self._jdf, "except")(other._jdf), 
self.sql_ctx)
 
-    def addColumn(self, colName, col):
+    def withColumn(self, colName, col):
         """ Return a new :class:`DataFrame` by adding a column.
 
-        >>> df.addColumn('age2', df.age + 2).collect()
+        >>> df.withColumn('age2', df.age + 2).collect()
         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
         """
         return self.select('*', col.alias(colName))
 
-    def renameColumn(self, existing, new):
+    def withColumnRenamed(self, existing, new):
         """ Rename an existing column to a new name
 
-        >>> df.renameColumn('age', 'age2').collect()
+        >>> df.withColumnRenamed('age', 'age2').collect()
         [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
         """
         cols = [Column(_to_java_column(c), self.sql_ctx).alias(new)
@@ -635,11 +634,11 @@ class DataFrame(object):
                 for c in self.columns]
         return self.select(*cols)
 
-    def to_pandas(self):
+    def toPandas(self):
         """
         Collect all the rows and return a `pandas.DataFrame`.
 
-        >>> df.to_pandas()  # doctest: +SKIP
+        >>> df.toPandas()  # doctest: +SKIP
            age   name
         0    2  Alice
         1    5    Bob
@@ -687,10 +686,11 @@ class GroupedData(object):
                       name to aggregate methods.
 
         >>> gdf = df.groupBy(df.name)
-        >>> gdf.agg({"age": "max"}).collect()
-        [Row(name=u'Bob', MAX(age#0)=5), Row(name=u'Alice', MAX(age#0)=2)]
-        >>> from pyspark.sql import Dsl
-        >>> gdf.agg(Dsl.min(df.age)).collect()
+        >>> gdf.agg({"*": "count"}).collect()
+        [Row(name=u'Bob', COUNT(1)=1), Row(name=u'Alice', COUNT(1)=1)]
+
+        >>> from pyspark.sql import functions as F
+        >>> gdf.agg(F.min(df.age)).collect()
         [Row(MIN(age#0)=5), Row(MIN(age#0)=2)]
         """
         assert exprs, "exprs should not be empty"
@@ -742,12 +742,12 @@ class GroupedData(object):
 
 def _create_column_from_literal(literal):
     sc = SparkContext._active_spark_context
-    return sc._jvm.Dsl.lit(literal)
+    return sc._jvm.functions.lit(literal)
 
 
 def _create_column_from_name(name):
     sc = SparkContext._active_spark_context
-    return sc._jvm.Dsl.col(name)
+    return sc._jvm.functions.col(name)
 
 
 def _to_java_column(col):
@@ -767,9 +767,9 @@ def _unary_op(name, doc="unary operator"):
     return _
 
 
-def _dsl_op(name, doc=''):
+def _func_op(name, doc=''):
     def _(self):
-        jc = getattr(self._sc._jvm.Dsl, name)(self._jc)
+        jc = getattr(self._sc._jvm.functions, name)(self._jc)
         return Column(jc, self.sql_ctx)
     _.__doc__ = doc
     return _
@@ -818,7 +818,7 @@ class Column(DataFrame):
         super(Column, self).__init__(jc, sql_ctx)
 
     # arithmetic operators
-    __neg__ = _dsl_op("negate")
+    __neg__ = _func_op("negate")
     __add__ = _bin_op("plus")
     __sub__ = _bin_op("minus")
     __mul__ = _bin_op("multiply")
@@ -842,7 +842,7 @@ class Column(DataFrame):
     # so use bitwise operators as boolean operators
     __and__ = _bin_op('and')
     __or__ = _bin_op('or')
-    __invert__ = _dsl_op('not')
+    __invert__ = _func_op('not')
     __rand__ = _bin_op("and")
     __ror__ = _bin_op("or")
 
@@ -920,11 +920,11 @@ class Column(DataFrame):
         else:
             return 'Column<%s>' % self._jdf.toString()
 
-    def to_pandas(self):
+    def toPandas(self):
         """
         Return a pandas.Series from the column
 
-        >>> df.age.to_pandas()  # doctest: +SKIP
+        >>> df.age.toPandas()  # doctest: +SKIP
         0    2
         1    5
         dtype: int64
@@ -934,123 +934,6 @@ class Column(DataFrame):
         return pd.Series(data)
 
 
-def _aggregate_func(name, doc=""):
-    """ Create a function for aggregator by name"""
-    def _(col):
-        sc = SparkContext._active_spark_context
-        jc = getattr(sc._jvm.Dsl, name)(_to_java_column(col))
-        return Column(jc)
-    _.__name__ = name
-    _.__doc__ = doc
-    return staticmethod(_)
-
-
-class UserDefinedFunction(object):
-    def __init__(self, func, returnType):
-        self.func = func
-        self.returnType = returnType
-        self._broadcast = None
-        self._judf = self._create_judf()
-
-    def _create_judf(self):
-        f = self.func  # put it in closure `func`
-        func = lambda _, it: imap(lambda x: f(*x), it)
-        ser = AutoBatchedSerializer(PickleSerializer())
-        command = (func, None, ser, ser)
-        sc = SparkContext._active_spark_context
-        pickled_command, broadcast_vars, env, includes = 
_prepare_for_python_RDD(sc, command, self)
-        ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
-        jdt = ssql_ctx.parseDataType(self.returnType.json())
-        judf = sc._jvm.UserDefinedPythonFunction(f.__name__, 
bytearray(pickled_command), env,
-                                                 includes, sc.pythonExec, 
broadcast_vars,
-                                                 sc._javaAccumulator, jdt)
-        return judf
-
-    def __del__(self):
-        if self._broadcast is not None:
-            self._broadcast.unpersist()
-            self._broadcast = None
-
-    def __call__(self, *cols):
-        sc = SparkContext._active_spark_context
-        jcols = ListConverter().convert([_to_java_column(c) for c in cols],
-                                        sc._gateway._gateway_client)
-        jc = self._judf.apply(sc._jvm.PythonUtils.toSeq(jcols))
-        return Column(jc)
-
-
-class Dsl(object):
-    """
-    A collections of builtin aggregators
-    """
-    DSLS = {
-        'lit': 'Creates a :class:`Column` of literal value.',
-        'col': 'Returns a :class:`Column` based on the given column name.',
-        'column': 'Returns a :class:`Column` based on the given column name.',
-        'upper': 'Converts a string expression to upper case.',
-        'lower': 'Converts a string expression to upper case.',
-        'sqrt': 'Computes the square root of the specified float value.',
-        'abs': 'Computes the absolutle value.',
-
-        'max': 'Aggregate function: returns the maximum value of the 
expression in a group.',
-        'min': 'Aggregate function: returns the minimum value of the 
expression in a group.',
-        'first': 'Aggregate function: returns the first value in a group.',
-        'last': 'Aggregate function: returns the last value in a group.',
-        'count': 'Aggregate function: returns the number of items in a group.',
-        'sum': 'Aggregate function: returns the sum of all values in the 
expression.',
-        'avg': 'Aggregate function: returns the average of the values in a 
group.',
-        'mean': 'Aggregate function: returns the average of the values in a 
group.',
-        'sumDistinct': 'Aggregate function: returns the sum of distinct values 
in the expression.',
-    }
-
-    for _name, _doc in DSLS.items():
-        locals()[_name] = _aggregate_func(_name, _doc)
-    del _name, _doc
-
-    @staticmethod
-    def countDistinct(col, *cols):
-        """ Return a new Column for distinct count of (col, *cols)
-
-        >>> from pyspark.sql import Dsl
-        >>> df.agg(Dsl.countDistinct(df.age, df.name).alias('c')).collect()
-        [Row(c=2)]
-
-        >>> df.agg(Dsl.countDistinct("age", "name").alias('c')).collect()
-        [Row(c=2)]
-        """
-        sc = SparkContext._active_spark_context
-        jcols = ListConverter().convert([_to_java_column(c) for c in cols],
-                                        sc._gateway._gateway_client)
-        jc = sc._jvm.Dsl.countDistinct(_to_java_column(col),
-                                       sc._jvm.PythonUtils.toSeq(jcols))
-        return Column(jc)
-
-    @staticmethod
-    def approxCountDistinct(col, rsd=None):
-        """ Return a new Column for approxiate distinct count of (col, *cols)
-
-        >>> from pyspark.sql import Dsl
-        >>> df.agg(Dsl.approxCountDistinct(df.age).alias('c')).collect()
-        [Row(c=2)]
-        """
-        sc = SparkContext._active_spark_context
-        if rsd is None:
-            jc = sc._jvm.Dsl.approxCountDistinct(_to_java_column(col))
-        else:
-            jc = sc._jvm.Dsl.approxCountDistinct(_to_java_column(col), rsd)
-        return Column(jc)
-
-    @staticmethod
-    def udf(f, returnType=StringType()):
-        """Create a user defined function (UDF)
-
-        >>> slen = Dsl.udf(lambda s: len(s), IntegerType())
-        >>> df.select(slen(df.name).alias('slen')).collect()
-        [Row(slen=5), Row(slen=3)]
-        """
-        return UserDefinedFunction(f, returnType)
-
-
 def _test():
     import doctest
     from pyspark.context import SparkContext
@@ -1059,11 +942,9 @@ def _test():
     globs = pyspark.sql.dataframe.__dict__.copy()
     sc = SparkContext('local[4]', 'PythonTest')
     globs['sc'] = sc
-    globs['sqlCtx'] = sqlCtx = SQLContext(sc)
-    rdd2 = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)])
-    rdd3 = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', 
height=85)])
-    globs['df'] = sqlCtx.inferSchema(rdd2)
-    globs['df2'] = sqlCtx.inferSchema(rdd3)
+    globs['sqlCtx'] = SQLContext(sc)
+    globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', 
age=5)]).toDF()
+    globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', 
height=85)]).toDF()
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.dataframe, globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
new file mode 100644
index 0000000..39aa550
--- /dev/null
+++ b/python/pyspark/sql/functions.py
@@ -0,0 +1,170 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A collections of builtin functions
+"""
+
+from itertools import imap
+
+from py4j.java_collections import ListConverter
+
+from pyspark import SparkContext
+from pyspark.rdd import _prepare_for_python_RDD
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.sql.types import StringType
+from pyspark.sql.dataframe import Column, _to_java_column
+
+
+__all__ = ['countDistinct', 'approxCountDistinct', 'udf']
+
+
+def _create_function(name, doc=""):
+    """ Create a function for aggregator by name"""
+    def _(col):
+        sc = SparkContext._active_spark_context
+        jc = getattr(sc._jvm.functions, name)(_to_java_column(col))
+        return Column(jc)
+    _.__name__ = name
+    _.__doc__ = doc
+    return _
+
+
+_functions = {
+    'lit': 'Creates a :class:`Column` of literal value.',
+    'col': 'Returns a :class:`Column` based on the given column name.',
+    'column': 'Returns a :class:`Column` based on the given column name.',
+    'upper': 'Converts a string expression to upper case.',
+    'lower': 'Converts a string expression to upper case.',
+    'sqrt': 'Computes the square root of the specified float value.',
+    'abs': 'Computes the absolutle value.',
+
+    'max': 'Aggregate function: returns the maximum value of the expression in 
a group.',
+    'min': 'Aggregate function: returns the minimum value of the expression in 
a group.',
+    'first': 'Aggregate function: returns the first value in a group.',
+    'last': 'Aggregate function: returns the last value in a group.',
+    'count': 'Aggregate function: returns the number of items in a group.',
+    'sum': 'Aggregate function: returns the sum of all values in the 
expression.',
+    'avg': 'Aggregate function: returns the average of the values in a group.',
+    'mean': 'Aggregate function: returns the average of the values in a 
group.',
+    'sumDistinct': 'Aggregate function: returns the sum of distinct values in 
the expression.',
+}
+
+
+for _name, _doc in _functions.items():
+    globals()[_name] = _create_function(_name, _doc)
+del _name, _doc
+__all__ += _functions.keys()
+
+
+def countDistinct(col, *cols):
+    """ Return a new Column for distinct count of `col` or `cols`
+
+    >>> df.agg(countDistinct(df.age, df.name).alias('c')).collect()
+    [Row(c=2)]
+
+    >>> df.agg(countDistinct("age", "name").alias('c')).collect()
+    [Row(c=2)]
+    """
+    sc = SparkContext._active_spark_context
+    jcols = ListConverter().convert([_to_java_column(c) for c in cols], 
sc._gateway._gateway_client)
+    jc = sc._jvm.functions.countDistinct(_to_java_column(col), 
sc._jvm.PythonUtils.toSeq(jcols))
+    return Column(jc)
+
+
+def approxCountDistinct(col, rsd=None):
+    """ Return a new Column for approximate distinct count of `col`
+
+    >>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
+    [Row(c=2)]
+    """
+    sc = SparkContext._active_spark_context
+    if rsd is None:
+        jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col))
+    else:
+        jc = sc._jvm.functions.approxCountDistinct(_to_java_column(col), rsd)
+    return Column(jc)
+
+
+class UserDefinedFunction(object):
+    """
+    User defined function in Python
+    """
+    def __init__(self, func, returnType):
+        self.func = func
+        self.returnType = returnType
+        self._broadcast = None
+        self._judf = self._create_judf()
+
+    def _create_judf(self):
+        f = self.func  # put it in closure `func`
+        func = lambda _, it: imap(lambda x: f(*x), it)
+        ser = AutoBatchedSerializer(PickleSerializer())
+        command = (func, None, ser, ser)
+        sc = SparkContext._active_spark_context
+        pickled_command, broadcast_vars, env, includes = 
_prepare_for_python_RDD(sc, command, self)
+        ssql_ctx = sc._jvm.SQLContext(sc._jsc.sc())
+        jdt = ssql_ctx.parseDataType(self.returnType.json())
+        judf = sc._jvm.UserDefinedPythonFunction(f.__name__, 
bytearray(pickled_command), env,
+                                                 includes, sc.pythonExec, 
broadcast_vars,
+                                                 sc._javaAccumulator, jdt)
+        return judf
+
+    def __del__(self):
+        if self._broadcast is not None:
+            self._broadcast.unpersist()
+            self._broadcast = None
+
+    def __call__(self, *cols):
+        sc = SparkContext._active_spark_context
+        jcols = ListConverter().convert([_to_java_column(c) for c in cols],
+                                        sc._gateway._gateway_client)
+        jc = self._judf.apply(sc._jvm.PythonUtils.toSeq(jcols))
+        return Column(jc)
+
+
+def udf(f, returnType=StringType()):
+    """Create a user defined function (UDF)
+
+    >>> slen = udf(lambda s: len(s), IntegerType())
+    >>> df.select(slen(df.name).alias('slen')).collect()
+    [Row(slen=5), Row(slen=3)]
+    """
+    return UserDefinedFunction(f, returnType)
+
+
+def _test():
+    import doctest
+    from pyspark.context import SparkContext
+    from pyspark.sql import Row, SQLContext
+    import pyspark.sql.dataframe
+    globs = pyspark.sql.dataframe.__dict__.copy()
+    sc = SparkContext('local[4]', 'PythonTest')
+    globs['sc'] = sc
+    globs['sqlCtx'] = SQLContext(sc)
+    globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', 
age=5)]).toDF()
+    globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', 
height=85)]).toDF()
+    (failure_count, test_count) = doctest.testmod(
+        pyspark.sql.dataframe, globs=globs,
+        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+
+if __name__ == "__main__":
+    _test()

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 43e5c3a..aa80bca 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -96,7 +96,7 @@ class SQLTests(ReusedPySparkTestCase):
         cls.sqlCtx = SQLContext(cls.sc)
         cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
         rdd = cls.sc.parallelize(cls.testData)
-        cls.df = cls.sqlCtx.createDataFrame(rdd)
+        cls.df = rdd.toDF()
 
     @classmethod
     def tearDownClass(cls):
@@ -138,7 +138,7 @@ class SQLTests(ReusedPySparkTestCase):
         df = self.sqlCtx.jsonRDD(rdd)
         df.count()
         df.collect()
-        df.schema()
+        df.schema
 
         # cache and checkpoint
         self.assertFalse(df.is_cached)
@@ -155,11 +155,11 @@ class SQLTests(ReusedPySparkTestCase):
 
     def test_apply_schema_to_row(self):
         df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""]))
-        df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema())
+        df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
         self.assertEqual(df.collect(), df2.collect())
 
         rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x))
-        df3 = self.sqlCtx.createDataFrame(rdd, df.schema())
+        df3 = self.sqlCtx.createDataFrame(rdd, df.schema)
         self.assertEqual(10, df3.count())
 
     def test_serialize_nested_array_and_map(self):
@@ -195,7 +195,7 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertEqual(1, result.head()[0])
 
         df2 = self.sqlCtx.createDataFrame(rdd, samplingRatio=1.0)
-        self.assertEqual(df.schema(), df2.schema())
+        self.assertEqual(df.schema, df2.schema)
         self.assertEqual({}, df2.map(lambda r: r.d).first())
         self.assertEqual([None, ""], df2.map(lambda r: r.s).collect())
         df2.registerTempTable("test2")
@@ -204,8 +204,7 @@ class SQLTests(ReusedPySparkTestCase):
 
     def test_struct_in_map(self):
         d = [Row(m={Row(i=1): Row(s="")})]
-        rdd = self.sc.parallelize(d)
-        df = self.sqlCtx.createDataFrame(rdd)
+        df = self.sc.parallelize(d).toDF()
         k, v = df.head().m.items()[0]
         self.assertEqual(1, k.i)
         self.assertEqual("", v.s)
@@ -213,8 +212,7 @@ class SQLTests(ReusedPySparkTestCase):
     def test_convert_row_to_dict(self):
         row = Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")})
         self.assertEqual(1, row.asDict()['l'][0].a)
-        rdd = self.sc.parallelize([row])
-        df = self.sqlCtx.createDataFrame(rdd)
+        df = self.sc.parallelize([row]).toDF()
         df.registerTempTable("test")
         row = self.sqlCtx.sql("select l, d from test").head()
         self.assertEqual(1, row.asDict()["l"][0].a)
@@ -223,9 +221,8 @@ class SQLTests(ReusedPySparkTestCase):
     def test_infer_schema_with_udt(self):
         from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
         row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
-        rdd = self.sc.parallelize([row])
-        df = self.sqlCtx.createDataFrame(rdd)
-        schema = df.schema()
+        df = self.sc.parallelize([row]).toDF()
+        schema = df.schema
         field = [f for f in schema.fields if f.name == "point"][0]
         self.assertEqual(type(field.dataType), ExamplePointUDT)
         df.registerTempTable("labeled_point")
@@ -238,15 +235,14 @@ class SQLTests(ReusedPySparkTestCase):
         rdd = self.sc.parallelize([row])
         schema = StructType([StructField("label", DoubleType(), False),
                              StructField("point", ExamplePointUDT(), False)])
-        df = self.sqlCtx.createDataFrame(rdd, schema)
+        df = rdd.toDF(schema)
         point = df.head().point
         self.assertEquals(point, ExamplePoint(1.0, 2.0))
 
     def test_parquet_with_udt(self):
         from pyspark.sql.tests import ExamplePoint
         row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
-        rdd = self.sc.parallelize([row])
-        df0 = self.sqlCtx.createDataFrame(rdd)
+        df0 = self.sc.parallelize([row]).toDF()
         output_dir = os.path.join(self.tempdir.name, "labeled_point")
         df0.saveAsParquetFile(output_dir)
         df1 = self.sqlCtx.parquetFile(output_dir)
@@ -280,10 +276,11 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertEqual([99, 100], sorted(g.agg({'key': 'max', 'value': 
'count'}).collect()[0]))
         self.assertEqual([Row(**{"AVG(key#0)": 49.5})], g.mean().collect())
 
-        from pyspark.sql import Dsl
-        self.assertEqual((0, u'99'), tuple(g.agg(Dsl.first(df.key), 
Dsl.last(df.value)).first()))
-        self.assertTrue(95 < g.agg(Dsl.approxCountDistinct(df.key)).first()[0])
-        self.assertEqual(100, g.agg(Dsl.countDistinct(df.value)).first()[0])
+        from pyspark.sql import functions
+        self.assertEqual((0, u'99'),
+                         tuple(g.agg(functions.first(df.key), 
functions.last(df.value)).first()))
+        self.assertTrue(95 < 
g.agg(functions.approxCountDistinct(df.key)).first()[0])
+        self.assertEqual(100, 
g.agg(functions.countDistinct(df.value)).first()[0])
 
     def test_save_and_load(self):
         df = self.df
@@ -339,8 +336,7 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
             
cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
         cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
         cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
-        rdd = cls.sc.parallelize(cls.testData)
-        cls.df = cls.sqlCtx.inferSchema(rdd)
+        cls.df = cls.sc.parallelize(cls.testData).toDF()
 
     @classmethod
     def tearDownClass(cls):

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/python/run-tests
----------------------------------------------------------------------
diff --git a/python/run-tests b/python/run-tests
index 077ad60..a2c2f37 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -35,7 +35,7 @@ rm -rf metastore warehouse
 function run_test() {
     echo "Running test: $1" | tee -a $LOG_FILE
 
-    SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 >> $LOG_FILE 2>&1
+    SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1
 
     FAILED=$((PIPESTATUS[0]||$FAILED))
 
@@ -67,6 +67,7 @@ function run_sql_tests() {
     run_test "pyspark/sql/types.py"
     run_test "pyspark/sql/context.py"
     run_test "pyspark/sql/dataframe.py"
+    run_test "pyspark/sql/functions.py"
     run_test "pyspark/sql/tests.py"
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index 0cf2de6..05faef8 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -137,7 +137,7 @@ private[repl] trait SparkILoopInit {
       command("import org.apache.spark.SparkContext._")
       command("import sqlContext.implicits._")
       command("import sqlContext.sql")
-      command("import org.apache.spark.sql.Dsl._")
+      command("import org.apache.spark.sql.functions._")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 201f267..529914a 100644
--- a/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.10/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -262,7 +262,7 @@ class ReplSuite extends FunSuite {
         |val sqlContext = new org.apache.spark.sql.SQLContext(sc)
         |import sqlContext.implicits._
         |case class TestCaseClass(value: Int)
-        |sc.parallelize(1 to 10).map(x => 
TestCaseClass(x)).toDataFrame.collect()
+        |sc.parallelize(1 to 10).map(x => TestCaseClass(x)).toDF.collect()
       """.stripMargin)
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 1bd2a69..7a5e94d 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -77,7 +77,7 @@ class SparkILoop(in0: Option[BufferedReader], protected val 
out: JPrintWriter)
       command("import org.apache.spark.SparkContext._")
       command("import sqlContext.implicits._")
       command("import sqlContext.sql")
-      command("import org.apache.spark.sql.Dsl._")
+      command("import org.apache.spark.sql.functions._")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index f959a50..a7cd412 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -152,7 +152,7 @@ case class MultiAlias(child: Expression, names: Seq[String])
 
   override lazy val resolved = false
 
-  override def newInstance = this
+  override def newInstance() = this
 
   override def withNullability(newNullability: Boolean) = this
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 9d5d6e7..f6ecee1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.sql
 
-import scala.annotation.tailrec
 import scala.language.implicitConversions
 
-import org.apache.spark.sql.Dsl.lit
+import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Subquery, Project, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedGetField
 import org.apache.spark.sql.types._
 
@@ -127,7 +126,7 @@ trait Column extends DataFrame {
    *   df.select( -df("amount") )
    *
    *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   df.select( negate(col("amount") );
    * }}}
    */
@@ -140,7 +139,7 @@ trait Column extends DataFrame {
    *   df.filter( !df("isActive") )
    *
    *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   df.filter( not(df.col("isActive")) );
    * }}
    */
@@ -153,7 +152,7 @@ trait Column extends DataFrame {
    *   df.filter( df("colA") === df("colB") )
    *
    *   // Java
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   df.filter( col("colA").equalTo(col("colB")) );
    * }}}
    */
@@ -168,7 +167,7 @@ trait Column extends DataFrame {
    *   df.filter( df("colA") === df("colB") )
    *
    *   // Java
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   df.filter( col("colA").equalTo(col("colB")) );
    * }}}
    */
@@ -182,7 +181,7 @@ trait Column extends DataFrame {
    *   df.select( !(df("colA") === df("colB")) )
    *
    *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   df.filter( col("colA").notEqual(col("colB")) );
    * }}}
    */
@@ -198,7 +197,7 @@ trait Column extends DataFrame {
    *   df.select( !(df("colA") === df("colB")) )
    *
    *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   df.filter( col("colA").notEqual(col("colB")) );
    * }}}
    */
@@ -213,7 +212,7 @@ trait Column extends DataFrame {
    *   people.select( people("age") > 21 )
    *
    *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   people.select( people("age").gt(21) );
    * }}}
    */
@@ -228,7 +227,7 @@ trait Column extends DataFrame {
    *   people.select( people("age") > lit(21) )
    *
    *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   people.select( people("age").gt(21) );
    * }}}
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 4f8f19e..e21e989 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -48,7 +48,7 @@ private[sql] object DataFrame {
  * }}}
  *
  * Once created, it can be manipulated using the various 
domain-specific-language (DSL) functions
- * defined in: [[DataFrame]] (this class), [[Column]], [[Dsl]] for the DSL.
+ * defined in: [[DataFrame]] (this class), [[Column]], [[functions]] for the 
DSL.
  *
  * To select a column from the data frame, use the apply method:
  * {{{
@@ -94,27 +94,27 @@ trait DataFrame extends RDDApi[Row] with Serializable {
     }
 
   /** Left here for backward compatibility. */
-  @deprecated("1.3.0", "use toDataFrame")
+  @deprecated("1.3.0", "use toDF")
   def toSchemaRDD: DataFrame = this
 
   /**
    * Returns the object itself. Used to force an implicit conversion from RDD 
to DataFrame in Scala.
    */
   // This is declared with parentheses to prevent the Scala compiler from 
treating
-  // `rdd.toDataFrame("1")` as invoking this toDataFrame and then apply on the 
returned DataFrame.
-  def toDataFrame(): DataFrame = this
+  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned 
DataFrame.
+  def toDF(): DataFrame = this
 
   /**
    * Returns a new [[DataFrame]] with columns renamed. This can be quite 
convenient in conversion
    * from a RDD of tuples into a [[DataFrame]] with meaningful names. For 
example:
    * {{{
    *   val rdd: RDD[(Int, String)] = ...
-   *   rdd.toDataFrame  // this implicit conversion creates a DataFrame with 
column name _1 and _2
-   *   rdd.toDataFrame("id", "name")  // this creates a DataFrame with column 
name "id" and "name"
+   *   rdd.toDF  // this implicit conversion creates a DataFrame with column 
name _1 and _2
+   *   rdd.toDF("id", "name")  // this creates a DataFrame with column name 
"id" and "name"
    * }}}
    */
   @scala.annotation.varargs
-  def toDataFrame(colNames: String*): DataFrame
+  def toDF(colNames: String*): DataFrame
 
   /** Returns the schema of this [[DataFrame]]. */
   def schema: StructType
@@ -132,7 +132,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
   def explain(extended: Boolean): Unit
 
   /** Only prints the physical plan to the console for debugging purpose. */
-  def explain(): Unit = explain(false)
+  def explain(): Unit = explain(extended = false)
 
   /**
    * Returns true if the `collect` and `take` methods can be run locally
@@ -179,11 +179,11 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    *
    * {{{
    *   // Scala:
-   *   import org.apache.spark.sql.dsl._
+   *   import org.apache.spark.sql.functions._
    *   df1.join(df2, "outer", $"df1Key" === $"df2Key")
    *
    *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
+   *   import static org.apache.spark.sql.functions.*;
    *   df1.join(df2, "outer", col("df1Key") === col("df2Key"));
    * }}}
    *
@@ -483,12 +483,12 @@ trait DataFrame extends RDDApi[Row] with Serializable {
   /**
    * Returns a new [[DataFrame]] by adding a column.
    */
-  def addColumn(colName: String, col: Column): DataFrame
+  def withColumn(colName: String, col: Column): DataFrame
 
   /**
    * Returns a new [[DataFrame]] with a column renamed.
    */
-  def renameColumn(existingName: String, newName: String): DataFrame
+  def withColumnRenamed(existingName: String, newName: String): DataFrame
 
   /**
    * Returns the first `n` rows.
@@ -520,6 +520,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
    * Returns a new RDD by applying a function to each partition of this 
DataFrame.
    */
   override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): 
RDD[R]
+
   /**
    * Applies a function `f` to all rows.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/e98dfe62/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala
new file mode 100644
index 0000000..a3187fe
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameHolder.scala
@@ -0,0 +1,30 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+/**
+ * A container for a [[DataFrame]], used for implicit conversions.
+ */
+private[sql] case class DataFrameHolder(df: DataFrame) {
+
+  // This is declared with parentheses to prevent the Scala compiler from 
treating
+  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned 
DataFrame.
+  def toDF(): DataFrame = df
+
+  def toDF(colNames: String*): DataFrame = df.toDF(colNames :_*)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to