Repository: spark
Updated Branches:
  refs/heads/master f70503761 -> 4a6e78abd


http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 9bdf611..9f539c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -31,8 +31,8 @@ import 
org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
 
 /**
-  * An interface for those physical operators that support codegen.
-  */
+ * An interface for those physical operators that support codegen.
+ */
 trait CodegenSupport extends SparkPlan {
 
   /** Prefix used in the current operator's variable names. */
@@ -46,10 +46,10 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-    * Creates a metric using the specified name.
-    *
-    * @return name of the variable representing the metric
-    */
+   * Creates a metric using the specified name.
+   *
+   * @return name of the variable representing the metric
+   */
   def metricTerm(ctx: CodegenContext, name: String): String = {
     val metric = ctx.addReferenceObj(name, longMetric(name))
     val value = ctx.freshName("metricValue")
@@ -59,25 +59,25 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-    * Whether this SparkPlan support whole stage codegen or not.
-    */
+   * Whether this SparkPlan support whole stage codegen or not.
+   */
   def supportCodegen: Boolean = true
 
   /**
-    * Which SparkPlan is calling produce() of this one. It's itself for the 
first SparkPlan.
-    */
+   * Which SparkPlan is calling produce() of this one. It's itself for the 
first SparkPlan.
+   */
   protected var parent: CodegenSupport = null
 
   /**
-    * Returns all the RDDs of InternalRow which generates the input rows.
-    *
-    * Note: right now we support up to two RDDs.
-    */
+   * Returns all the RDDs of InternalRow which generates the input rows.
+   *
+   * Note: right now we support up to two RDDs.
+   */
   def upstreams(): Seq[RDD[InternalRow]]
 
   /**
-    * Returns Java source code to process the rows from upstream.
-    */
+   * Returns Java source code to process the rows from upstream.
+   */
   final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
     this.parent = parent
     ctx.freshNamePrefix = variablePrefix
@@ -89,28 +89,28 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-    * Generate the Java source code to process, should be overridden by 
subclass to support codegen.
-    *
-    * doProduce() usually generate the framework, for example, aggregation 
could generate this:
-    *
-    *   if (!initialized) {
-    *     # create a hash map, then build the aggregation hash map
-    *     # call child.produce()
-    *     initialized = true;
-    *   }
-    *   while (hashmap.hasNext()) {
-    *     row = hashmap.next();
-    *     # build the aggregation results
-    *     # create variables for results
-    *     # call consume(), which will call parent.doConsume()
+   * Generate the Java source code to process, should be overridden by 
subclass to support codegen.
+   *
+   * doProduce() usually generate the framework, for example, aggregation 
could generate this:
+   *
+   *   if (!initialized) {
+   *     # create a hash map, then build the aggregation hash map
+   *     # call child.produce()
+   *     initialized = true;
+   *   }
+   *   while (hashmap.hasNext()) {
+   *     row = hashmap.next();
+   *     # build the aggregation results
+   *     # create variables for results
+   *     # call consume(), which will call parent.doConsume()
    *      if (shouldStop()) return;
-    *   }
-    */
+   *   }
+   */
   protected def doProduce(ctx: CodegenContext): String
 
   /**
-    * Consume the generated columns or row from current SparkPlan, call it's 
parent's doConsume().
-    */
+   * Consume the generated columns or row from current SparkPlan, call it's 
parent's doConsume().
+   */
   final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: 
String = null): String = {
     val inputVars =
       if (row != null) {
@@ -158,9 +158,9 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-    * Returns source code to evaluate all the variables, and clear the code of 
them, to prevent
-    * them to be evaluated twice.
-    */
+   * Returns source code to evaluate all the variables, and clear the code of 
them, to prevent
+   * them to be evaluated twice.
+   */
   protected def evaluateVariables(variables: Seq[ExprCode]): String = {
     val evaluate = variables.filter(_.code != 
"").map(_.code.trim).mkString("\n")
     variables.foreach(_.code = "")
@@ -168,9 +168,9 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-    * Returns source code to evaluate the variables for required attributes, 
and clear the code
-    * of evaluated variables, to prevent them to be evaluated twice..
-    */
+   * Returns source code to evaluate the variables for required attributes, 
and clear the code
+   * of evaluated variables, to prevent them to be evaluated twice..
+   */
   protected def evaluateRequiredVariables(
       attributes: Seq[Attribute],
       variables: Seq[ExprCode],
@@ -194,18 +194,18 @@ trait CodegenSupport extends SparkPlan {
   def usedInputs: AttributeSet = references
 
   /**
-    * Generate the Java source code to process the rows from child SparkPlan.
-    *
-    * This should be override by subclass to support codegen.
-    *
-    * For example, Filter will generate the code like this:
-    *
-    *   # code to evaluate the predicate expression, result is isNull1 and 
value2
-    *   if (isNull1 || !value2) continue;
-    *   # call consume(), which will call parent.doConsume()
-    *
-    * Note: A plan can either consume the rows as UnsafeRow (row), or a list 
of variables (input).
-    */
+   * Generate the Java source code to process the rows from child SparkPlan.
+   *
+   * This should be override by subclass to support codegen.
+   *
+   * For example, Filter will generate the code like this:
+   *
+   *   # code to evaluate the predicate expression, result is isNull1 and 
value2
+   *   if (isNull1 || !value2) continue;
+   *   # call consume(), which will call parent.doConsume()
+   *
+   * Note: A plan can either consume the rows as UnsafeRow (row), or a list of 
variables (input).
+   */
   def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): 
String = {
     throw new UnsupportedOperationException
   }
@@ -213,11 +213,11 @@ trait CodegenSupport extends SparkPlan {
 
 
 /**
-  * InputAdapter is used to hide a SparkPlan from a subtree that support 
codegen.
-  *
-  * This is the leaf node of a tree with WholeStageCodegen, is used to 
generate code that consumes
-  * an RDD iterator of InternalRow.
-  */
+ * InputAdapter is used to hide a SparkPlan from a subtree that support 
codegen.
+ *
+ * This is the leaf node of a tree with WholeStageCodegen, is used to generate 
code that consumes
+ * an RDD iterator of InternalRow.
+ */
 case class InputAdapter(child: SparkPlan) extends UnaryNode with 
CodegenSupport {
 
   override def output: Seq[Attribute] = child.output
@@ -260,33 +260,33 @@ object WholeStageCodegen {
 }
 
 /**
-  * WholeStageCodegen compile a subtree of plans that support codegen together 
into single Java
-  * function.
-  *
-  * Here is the call graph of to generate Java source (plan A support codegen, 
but plan B does not):
-  *
-  *   WholeStageCodegen       Plan A               FakeInput        Plan B
-  * =========================================================================
-  *
-  * -> execute()
-  *     |
-  *  doExecute() --------->   upstreams() -------> upstreams() ------> 
execute()
-  *     |
-  *     +----------------->   produce()
-  *                             |
-  *                          doProduce()  -------> produce()
-  *                                                   |
-  *                                                doProduce()
-  *                                                   |
-  *                         doConsume() <--------- consume()
-  *                             |
-  *  doConsume()  <--------  consume()
-  *
-  * SparkPlan A should override doProduce() and doConsume().
-  *
-  * doCodeGen() will create a CodeGenContext, which will hold a list of 
variables for input,
-  * used to generated code for BoundReference.
-  */
+ * WholeStageCodegen compile a subtree of plans that support codegen together 
into single Java
+ * function.
+ *
+ * Here is the call graph of to generate Java source (plan A support codegen, 
but plan B does not):
+ *
+ *   WholeStageCodegen       Plan A               FakeInput        Plan B
+ * =========================================================================
+ *
+ * -> execute()
+ *     |
+ *  doExecute() --------->   upstreams() -------> upstreams() ------> execute()
+ *     |
+ *     +----------------->   produce()
+ *                             |
+ *                          doProduce()  -------> produce()
+ *                                                   |
+ *                                                doProduce()
+ *                                                   |
+ *                         doConsume() <--------- consume()
+ *                             |
+ *  doConsume()  <--------  consume()
+ *
+ * SparkPlan A should override doProduce() and doConsume().
+ *
+ * doCodeGen() will create a CodeGenContext, which will hold a list of 
variables for input,
+ * used to generated code for BoundReference.
+ */
 case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with 
CodegenSupport {
 
   override def output: Seq[Attribute] = child.output
@@ -422,8 +422,8 @@ case class WholeStageCodegen(child: SparkPlan) extends 
UnaryNode with CodegenSup
 
 
 /**
-  * Find the chained plans that support codegen, collapse them together as 
WholeStageCodegen.
-  */
+ * Find the chained plans that support codegen, collapse them together as 
WholeStageCodegen.
+ */
 case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
 
   private def supportCodegen(e: Expression): Boolean = e match {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 7d05678..8060891 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -444,8 +444,8 @@ private[execution] final case class RangeBoundOrdering(
 }
 
 /**
-  * The interface of row buffer for a partition
-  */
+ * The interface of row buffer for a partition
+ */
 private[execution] abstract class RowBuffer {
 
   /** Number of rows. */
@@ -462,8 +462,8 @@ private[execution] abstract class RowBuffer {
 }
 
 /**
-  * A row buffer based on ArrayBuffer (the number of rows is limited)
-  */
+ * A row buffer based on ArrayBuffer (the number of rows is limited)
+ */
 private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) 
extends RowBuffer {
 
   private[this] var cursor: Int = -1
@@ -493,8 +493,8 @@ private[execution] class ArrayRowBuffer(buffer: 
ArrayBuffer[UnsafeRow]) extends
 }
 
 /**
-  * An external buffer of rows based on UnsafeExternalSorter
-  */
+ * An external buffer of rows based on UnsafeExternalSorter
+ */
 private[execution] class ExternalRowBuffer(sorter: UnsafeExternalSorter, 
numFields: Int)
   extends RowBuffer {
 
@@ -654,12 +654,16 @@ private[execution] final class SlidingWindowFunctionFrame(
   /** The rows within current sliding window. */
   private[this] val buffer = new util.ArrayDeque[InternalRow]()
 
-  /** Index of the first input row with a value greater than the upper bound 
of the current
-    * output row. */
+  /**
+   * Index of the first input row with a value greater than the upper bound of 
the current
+   * output row.
+   */
   private[this] var inputHighIndex = 0
 
-  /** Index of the first input row with a value equal to or greater than the 
lower bound of the
-    * current output row. */
+  /**
+   * Index of the first input row with a value equal to or greater than the 
lower bound of the
+   * current output row.
+   */
   private[this] var inputLowIndex = 0
 
   /** Prepare the frame for calculating a new partition. Reset all variables. 
*/
@@ -763,8 +767,10 @@ private[execution] final class 
UnboundedPrecedingWindowFunctionFrame(
   /** The next row from `input`. */
   private[this] var nextRow: InternalRow = null
 
-  /** Index of the first input row with a value greater than the upper bound 
of the current
-   * output row. */
+  /**
+   * Index of the first input row with a value greater than the upper bound of 
the current
+   * output row.
+   */
   private[this] var inputIndex = 0
 
   /** Prepare the frame for calculating a new partition. */
@@ -819,8 +825,10 @@ private[execution] final class 
UnboundedFollowingWindowFunctionFrame(
   /** Rows of the partition currently being processed. */
   private[this] var input: RowBuffer = null
 
-  /** Index of the first input row with a value equal to or greater than the 
lower bound of the
-   * current output row. */
+  /**
+   * Index of the first input row with a value equal to or greater than the 
lower bound of the
+   * current output row.
+   */
   private[this] var inputIndex = 0
 
   /** Prepare the frame for calculating a new partition. */

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index 15627a7..042c731 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -47,17 +47,17 @@ abstract class AggregationIterator(
   ///////////////////////////////////////////////////////////////////////////
 
   /**
-    * The following combinations of AggregationMode are supported:
-    * - Partial
-    * - PartialMerge (for single distinct)
-    * - Partial and PartialMerge (for single distinct)
-    * - Final
-    * - Complete (for SortBasedAggregate with functions that does not support 
Partial)
-    * - Final and Complete (currently not used)
-    *
-    * TODO: AggregateMode should have only two modes: Update and Merge, 
AggregateExpression
-    * could have a flag to tell it's final or not.
-    */
+   * The following combinations of AggregationMode are supported:
+   * - Partial
+   * - PartialMerge (for single distinct)
+   * - Partial and PartialMerge (for single distinct)
+   * - Final
+   * - Complete (for SortBasedAggregate with functions that does not support 
Partial)
+   * - Final and Complete (currently not used)
+   *
+   * TODO: AggregateMode should have only two modes: Update and Merge, 
AggregateExpression
+   * could have a flag to tell it's final or not.
+   */
   {
     val modes = aggregateExpressions.map(_.mode).distinct.toSet
     require(modes.size <= 2,

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 8f97498..de1491d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -46,9 +46,9 @@ class SortBasedAggregationIterator(
     newMutableProjection) {
 
   /**
-    * Creates a new aggregation buffer and initializes buffer values
-    * for all aggregate functions.
-    */
+   * Creates a new aggregation buffer and initializes buffer values
+   * for all aggregate functions.
+   */
   private def newBuffer: MutableRow = {
     val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
     val bufferRowSize: Int = bufferSchema.length

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 7c215d1..60027ed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -266,8 +266,8 @@ case class TungstenAggregate(
   private var sorterTerm: String = _
 
   /**
-    * This is called by generated Java class, should be public.
-    */
+   * This is called by generated Java class, should be public.
+   */
   def createHashMap(): UnsafeFixedWidthAggregationMap = {
     // create initialized aggregate buffer
     val initExpr = declFunctions.flatMap(f => f.initialValues)
@@ -286,15 +286,15 @@ case class TungstenAggregate(
   }
 
   /**
-    * This is called by generated Java class, should be public.
-    */
+   * This is called by generated Java class, should be public.
+   */
   def createUnsafeJoiner(): UnsafeRowJoiner = {
     GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
   }
 
   /**
-    * Called by generated Java class to finish the aggregate and return a 
KVIterator.
-    */
+   * Called by generated Java class to finish the aggregate and return a 
KVIterator.
+   */
   def finishAggregate(
       hashMap: UnsafeFixedWidthAggregationMap,
       sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
@@ -372,8 +372,8 @@ case class TungstenAggregate(
   }
 
   /**
-    * Generate the code for output.
-    */
+   * Generate the code for output.
+   */
   private def generateResultCode(
       ctx: CodegenContext,
       keyTerm: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index f3514cd..159fdc9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -168,10 +168,10 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
       private[this] var reader: RecordReader[Void, V] = null
 
       /**
-        * If the format is ParquetInputFormat, try to create the optimized 
RecordReader. If this
-        * fails (for example, unsupported schema), try with the normal reader.
-        * TODO: plumb this through a different way?
-        */
+       * If the format is ParquetInputFormat, try to create the optimized 
RecordReader. If this
+       * fails (for example, unsupported schema), try with the normal reader.
+       * TODO: plumb this through a different way?
+       */
       if (enableVectorizedParquetReader &&
         format.getClass.getName == 
"org.apache.parquet.hadoop.ParquetInputFormat") {
         val parquetReader: VectorizedParquetRecordReader = new 
VectorizedParquetRecordReader()

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index 797f740..ea843a1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -33,11 +33,11 @@ import org.apache.spark.unsafe.types.UTF8String
 private[csv] object CSVInferSchema {
 
   /**
-    * Similar to the JSON schema inference
-    *     1. Infer type of each row
-    *     2. Merge row types to find common type
-    *     3. Replace any null types with string type
-    */
+   * Similar to the JSON schema inference
+   *     1. Infer type of each row
+   *     2. Merge row types to find common type
+   *     3. Replace any null types with string type
+   */
   def infer(
       tokenRdd: RDD[Array[String]],
       header: Array[String],
@@ -75,9 +75,9 @@ private[csv] object CSVInferSchema {
   }
 
   /**
-    * Infer type of string field. Given known type Double, and a string "1", 
there is no
-    * point checking if it is an Int, as the final type must be Double or 
higher.
-    */
+   * Infer type of string field. Given known type Double, and a string "1", 
there is no
+   * point checking if it is an Int, as the final type must be Double or 
higher.
+   */
   def inferField(typeSoFar: DataType, field: String, nullValue: String = ""): 
DataType = {
     if (field == null || field.isEmpty || field == nullValue) {
       typeSoFar
@@ -142,9 +142,9 @@ private[csv] object CSVInferSchema {
   private val numericPrecedence: IndexedSeq[DataType] = 
HiveTypeCoercion.numericPrecedence
 
   /**
-    * Copied from internal Spark api
-    * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]]
-    */
+   * Copied from internal Spark api
+   * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]]
+   */
   val findTightestCommonType: (DataType, DataType) => Option[DataType] = {
     case (t1, t2) if t1 == t2 => Some(t1)
     case (NullType, t1) => Some(t1)

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index c0d6f6f..34fcbdf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -38,8 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
 import org.apache.spark.util.collection.BitSet
 
 /**
-  * Provides access to CSV data from pure SQL statements.
-  */
+ * Provides access to CSV data from pure SQL statements.
+ */
 class DefaultSource extends FileFormat with DataSourceRegister {
 
   override def shortName(): String = "csv"

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 877e159..2e88d58 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -51,11 +51,11 @@ case class DescribeCommand(
 }
 
 /**
-  * Used to represent the operation of create table using a data source.
+ * Used to represent the operation of create table using a data source.
  *
-  * @param allowExisting If it is true, we will do nothing when the table 
already exists.
-  *                      If it is false, an exception will be thrown
-  */
+ * @param allowExisting If it is true, we will do nothing when the table 
already exists.
+ *                      If it is false, an exception will be thrown
+ */
 case class CreateTableUsing(
     tableIdent: TableIdentifier,
     userSpecifiedSchema: Option[StructType],

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 0ed1ed4..41e566c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -122,8 +122,8 @@ case class BroadcastHashJoin(
   }
 
   /**
-    * Returns a tuple of Broadcast of HashedRelation and the variable name for 
it.
-    */
+   * Returns a tuple of Broadcast of HashedRelation and the variable name for 
it.
+   */
   private def prepareBroadcast(ctx: CodegenContext): 
(Broadcast[HashedRelation], String) = {
     // create a name for HashedRelation
     val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
@@ -139,9 +139,9 @@ case class BroadcastHashJoin(
   }
 
   /**
-    * Returns the code for generating join key for stream side, and expression 
of whether the key
-    * has any null in it or not.
-    */
+   * Returns the code for generating join key for stream side, and expression 
of whether the key
+   * has any null in it or not.
+   */
   private def genStreamSideJoinKey(
       ctx: CodegenContext,
       input: Seq[ExprCode]): (ExprCode, String) = {
@@ -160,8 +160,8 @@ case class BroadcastHashJoin(
   }
 
   /**
-    * Generates the code for variable of build side.
-    */
+   * Generates the code for variable of build side.
+   */
   private def genBuildSideVars(ctx: CodegenContext, matched: String): 
Seq[ExprCode] = {
     ctx.currentVars = null
     ctx.INPUT_ROW = matched
@@ -188,8 +188,8 @@ case class BroadcastHashJoin(
   }
 
   /**
-    * Generates the code for Inner join.
-    */
+   * Generates the code for Inner join.
+   */
   private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String 
= {
     val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
     val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
@@ -254,8 +254,8 @@ case class BroadcastHashJoin(
 
 
   /**
-    * Generates the code for left or right outer join.
-    */
+   * Generates the code for left or right outer join.
+   */
   private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String 
= {
     val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
     val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index fb65b50..edb4c5a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -28,10 +28,10 @@ import org.apache.spark.util.CompletionIterator
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 /**
-  * An optimized CartesianRDD for UnsafeRow, which will cache the rows from 
second child RDD,
-  * will be much faster than building the right partition for every row in 
left RDD, it also
-  * materialize the right RDD (in case of the right RDD is nondeterministic).
-  */
+ * An optimized CartesianRDD for UnsafeRow, which will cache the rows from 
second child RDD,
+ * will be much faster than building the right partition for every row in left 
RDD, it also
+ * materialize the right RDD (in case of the right RDD is nondeterministic).
+ */
 private[spark]
 class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], 
numFieldsOfRight: Int)
   extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 5f42d07..c298b7d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -64,10 +64,10 @@ trait HashJoin {
   }
 
   /**
-    * Try to rewrite the key as LongType so we can use getLong(), if they key 
can fit with a long.
-    *
-    * If not, returns the original expressions.
-    */
+   * Try to rewrite the key as LongType so we can use getLong(), if they key 
can fit with a long.
+   *
+   * If not, returns the original expressions.
+   */
   def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = {
     var keyExpr: Expression = null
     var width = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index dc4793e..91c470d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -38,20 +38,20 @@ import org.apache.spark.util.collection.CompactBuffer
  */
 private[execution] sealed trait HashedRelation {
   /**
-    * Returns matched rows.
-    */
+   * Returns matched rows.
+   */
   def get(key: InternalRow): Seq[InternalRow]
 
   /**
-    * Returns matched rows for a key that has only one column with LongType.
-    */
+   * Returns matched rows for a key that has only one column with LongType.
+   */
   def get(key: Long): Seq[InternalRow] = {
     throw new UnsupportedOperationException
   }
 
   /**
-    * Returns the size of used memory.
-    */
+   * Returns the size of used memory.
+   */
   def getMemorySize: Long = 1L  // to make the test happy
 
   /**
@@ -77,20 +77,20 @@ private[execution] sealed trait HashedRelation {
 }
 
 /**
-  * Interface for a hashed relation that have only one row per key.
-  *
-  * We should call getValue() for better performance.
-  */
+ * Interface for a hashed relation that have only one row per key.
+ *
+ * We should call getValue() for better performance.
+ */
 private[execution] trait UniqueHashedRelation extends HashedRelation {
 
   /**
-    * Returns the matched single row.
-    */
+   * Returns the matched single row.
+   */
   def getValue(key: InternalRow): InternalRow
 
   /**
-    * Returns the matched single row with key that have only one column of 
LongType.
-    */
+   * Returns the matched single row with key that have only one column of 
LongType.
+   */
   def getValue(key: Long): InternalRow = {
     throw new UnsupportedOperationException
   }
@@ -345,8 +345,8 @@ private[joins] object UnsafeHashedRelation {
 }
 
 /**
-  * An interface for a hashed relation that the key is a Long.
-  */
+ * An interface for a hashed relation that the key is a Long.
+ */
 private[joins] trait LongHashedRelation extends HashedRelation {
   override def get(key: InternalRow): Seq[InternalRow] = {
     get(key.getLong(0))
@@ -396,26 +396,26 @@ private[joins] final class UniqueLongHashedRelation(
 }
 
 /**
-  * A relation that pack all the rows into a byte array, together with offsets 
and sizes.
-  *
-  * All the bytes of UnsafeRow are packed together as `bytes`:
-  *
-  *  [  Row0  ][  Row1  ][] ... [  RowN  ]
-  *
-  * With keys:
-  *
-  *   start    start+1   ...       start+N
-  *
-  * `offsets` are offsets of UnsafeRows in the `bytes`
-  * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this 
key.
-  *
-  *  For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 
will stored as:
-  *
-  *  start   = 3
-  *  offsets = [0, 0, 24]
-  *  sizes   = [24, 0, 32]
-  *  bytes   = [0 - 24][][24 - 56]
-  */
+ * A relation that pack all the rows into a byte array, together with offsets 
and sizes.
+ *
+ * All the bytes of UnsafeRow are packed together as `bytes`:
+ *
+ *  [  Row0  ][  Row1  ][] ... [  RowN  ]
+ *
+ * With keys:
+ *
+ *   start    start+1   ...       start+N
+ *
+ * `offsets` are offsets of UnsafeRows in the `bytes`
+ * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key.
+ *
+ *  For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 
will stored as:
+ *
+ *  start   = 3
+ *  offsets = [0, 0, 24]
+ *  sizes   = [24, 0, 32]
+ *  bytes   = [0 - 24][][24 - 56]
+ */
 private[joins] final class LongArrayRelation(
     private var numFields: Int,
     private var start: Long,
@@ -483,8 +483,8 @@ private[joins] final class LongArrayRelation(
 }
 
 /**
-  * Create hashed relation with key that is long.
-  */
+ * Create hashed relation with key that is long.
+ */
 private[joins] object LongHashedRelation {
 
   val DENSE_FACTOR = 0.2

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 60bd8ea..0e7b2f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -256,9 +256,9 @@ case class SortMergeJoin(
   }
 
   /**
-    * Generate a function to scan both left and right to find a match, returns 
the term for
-    * matched one row from left side and buffered rows from right side.
-    */
+   * Generate a function to scan both left and right to find a match, returns 
the term for
+   * matched one row from left side and buffered rows from right side.
+   */
   private def genScanner(ctx: CodegenContext): (String, String) = {
     // Create class member for next row from both sides.
     val leftRow = ctx.freshName("leftRow")
@@ -341,12 +341,12 @@ case class SortMergeJoin(
   }
 
   /**
-    * Creates variables for left part of result row.
-    *
-    * In order to defer the access after condition and also only access once 
in the loop,
-    * the variables should be declared separately from accessing the columns, 
we can't use the
-    * codegen of BoundReference here.
-    */
+   * Creates variables for left part of result row.
+   *
+   * In order to defer the access after condition and also only access once in 
the loop,
+   * the variables should be declared separately from accessing the columns, 
we can't use the
+   * codegen of BoundReference here.
+   */
   private def createLeftVars(ctx: CodegenContext, leftRow: String): 
Seq[ExprCode] = {
     ctx.INPUT_ROW = leftRow
     left.output.zipWithIndex.map { case (a, i) =>
@@ -370,9 +370,9 @@ case class SortMergeJoin(
   }
 
   /**
-    * Creates the variables for right part of result row, using 
BoundReference, since the right
-    * part are accessed inside the loop.
-    */
+   * Creates the variables for right part of result row, using BoundReference, 
since the right
+   * part are accessed inside the loop.
+   */
   private def createRightVar(ctx: CodegenContext, rightRow: String): 
Seq[ExprCode] = {
     ctx.INPUT_ROW = rightRow
     right.output.zipWithIndex.map { case (a, i) =>
@@ -381,12 +381,12 @@ case class SortMergeJoin(
   }
 
   /**
-    * Splits variables based on whether it's used by condition or not, returns 
the code to create
-    * these variables before the condition and after the condition.
-    *
-    * Only a few columns are used by condition, then we can skip the accessing 
of those columns
-    * that are not used by condition also filtered out by condition.
-    */
+   * Splits variables based on whether it's used by condition or not, returns 
the code to create
+   * these variables before the condition and after the condition.
+   *
+   * Only a few columns are used by condition, then we can skip the accessing 
of those columns
+   * that are not used by condition also filtered out by condition.
+   */
   private def splitVarsByCondition(
       attributes: Seq[Attribute],
       variables: Seq[ExprCode]): (String, String) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 998eb82..8ece3c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -468,10 +468,10 @@ private[state] class HDFSBackedStateStoreProvider(
   }
 
   /**
-    * Clean up old snapshots and delta files that are not needed any more. It 
ensures that last
-    * few versions of the store can be recovered from the files, so 
re-executed RDD operations
-    * can re-apply updates on the past versions of the store.
-    */
+   * Clean up old snapshots and delta files that are not needed any more. It 
ensures that last
+   * few versions of the store can be recovered from the files, so re-executed 
RDD operations
+   * can re-apply updates on the past versions of the store.
+   */
   private[state] def cleanup(): Unit = {
     try {
       val files = fetchFiles()

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 24a01f5..012b125 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -45,8 +45,8 @@ private[ui] case class SparkPlanGraph(
   }
 
   /**
-    * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
-    */
+   * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
+   */
   val allNodes: Seq[SparkPlanGraphNode] = {
     nodes.flatMap {
       case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index baf947d..da58ba2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -332,95 +332,94 @@ object functions {
   }
 
   /**
-    * Aggregate function: returns the first value in a group.
-    *
-    * The function by default returns the first values it sees. It will return 
the first non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: returns the first value in a group.
+   *
+   * The function by default returns the first values it sees. It will return 
the first non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
     new First(e.expr, Literal(ignoreNulls))
   }
 
   /**
-    * Aggregate function: returns the first value of a column in a group.
-    *
-    * The function by default returns the first values it sees. It will return 
the first non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: returns the first value of a column in a group.
+   *
+   * The function by default returns the first values it sees. It will return 
the first non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def first(columnName: String, ignoreNulls: Boolean): Column = {
     first(Column(columnName), ignoreNulls)
   }
 
   /**
-    * Aggregate function: returns the first value in a group.
-    *
-    * The function by default returns the first values it sees. It will return 
the first non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 1.3.0
-    */
+   * Aggregate function: returns the first value in a group.
+   *
+   * The function by default returns the first values it sees. It will return 
the first non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 1.3.0
+   */
   def first(e: Column): Column = first(e, ignoreNulls = false)
 
   /**
-    * Aggregate function: returns the first value of a column in a group.
-    *
-    * The function by default returns the first values it sees. It will return 
the first non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 1.3.0
-    */
+   * Aggregate function: returns the first value of a column in a group.
+   *
+   * The function by default returns the first values it sees. It will return 
the first non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 1.3.0
+   */
   def first(columnName: String): Column = first(Column(columnName))
 
-
   /**
-    * Aggregate function: indicates whether a specified column in a GROUP BY 
list is aggregated
-    * or not, returns 1 for aggregated or 0 for not aggregated in the result 
set.
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: indicates whether a specified column in a GROUP BY 
list is aggregated
+   * or not, returns 1 for aggregated or 0 for not aggregated in the result 
set.
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def grouping(e: Column): Column = Column(Grouping(e.expr))
 
   /**
-    * Aggregate function: indicates whether a specified column in a GROUP BY 
list is aggregated
-    * or not, returns 1 for aggregated or 0 for not aggregated in the result 
set.
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: indicates whether a specified column in a GROUP BY 
list is aggregated
+   * or not, returns 1 for aggregated or 0 for not aggregated in the result 
set.
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def grouping(columnName: String): Column = grouping(Column(columnName))
 
   /**
-    * Aggregate function: returns the level of grouping, equals to
-    *
-    *   (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
-    *
-    * Note: the list of columns should match with grouping columns exactly, or 
empty (means all the
-    * grouping columns).
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: returns the level of grouping, equals to
+   *
+   *   (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
+   *
+   * Note: the list of columns should match with grouping columns exactly, or 
empty (means all the
+   * grouping columns).
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def grouping_id(cols: Column*): Column = Column(GroupingID(cols.map(_.expr)))
 
   /**
-    * Aggregate function: returns the level of grouping, equals to
-    *
-    *   (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
-    *
-    * Note: the list of columns should match with grouping columns exactly.
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: returns the level of grouping, equals to
+   *
+   *   (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
+   *
+   * Note: the list of columns should match with grouping columns exactly.
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def grouping_id(colName: String, colNames: String*): Column = {
     grouping_id((Seq(colName) ++ colNames).map(n => Column(n)) : _*)
   }
@@ -442,51 +441,51 @@ object functions {
   def kurtosis(columnName: String): Column = kurtosis(Column(columnName))
 
   /**
-    * Aggregate function: returns the last value in a group.
-    *
-    * The function by default returns the last values it sees. It will return 
the last non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: returns the last value in a group.
+   *
+   * The function by default returns the last values it sees. It will return 
the last non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
     new Last(e.expr, Literal(ignoreNulls))
   }
 
   /**
-    * Aggregate function: returns the last value of the column in a group.
-    *
-    * The function by default returns the last values it sees. It will return 
the last non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 2.0.0
-    */
+   * Aggregate function: returns the last value of the column in a group.
+   *
+   * The function by default returns the last values it sees. It will return 
the last non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 2.0.0
+   */
   def last(columnName: String, ignoreNulls: Boolean): Column = {
     last(Column(columnName), ignoreNulls)
   }
 
   /**
-    * Aggregate function: returns the last value in a group.
-    *
-    * The function by default returns the last values it sees. It will return 
the last non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 1.3.0
-    */
+   * Aggregate function: returns the last value in a group.
+   *
+   * The function by default returns the last values it sees. It will return 
the last non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 1.3.0
+   */
   def last(e: Column): Column = last(e, ignoreNulls = false)
 
   /**
-    * Aggregate function: returns the last value of the column in a group.
-    *
-    * The function by default returns the last values it sees. It will return 
the last non-null
-    * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
-    *
-    * @group agg_funcs
-    * @since 1.3.0
-    */
+   * Aggregate function: returns the last value of the column in a group.
+   *
+   * The function by default returns the last values it sees. It will return 
the last non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, 
then null is returned.
+   *
+   * @group agg_funcs
+   * @since 1.3.0
+   */
   def last(columnName: String): Column = last(Column(columnName), ignoreNulls 
= false)
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index e8834d0..14e1471 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -152,19 +152,19 @@ trait StreamSinkProvider {
 @DeveloperApi
 trait CreatableRelationProvider {
   /**
-    * Creates a relation with the given parameters based on the contents of 
the given
-    * DataFrame. The mode specifies the expected behavior of createRelation 
when
-    * data already exists.
-    * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
-    * Append mode means that when saving a DataFrame to a data source, if data 
already exists,
-    * contents of the DataFrame are expected to be appended to existing data.
-    * Overwrite mode means that when saving a DataFrame to a data source, if 
data already exists,
-    * existing data is expected to be overwritten by the contents of the 
DataFrame.
-    * ErrorIfExists mode means that when saving a DataFrame to a data source,
-    * if data already exists, an exception is expected to be thrown.
-     *
-     * @since 1.3.0
-    */
+   * Creates a relation with the given parameters based on the contents of the 
given
+   * DataFrame. The mode specifies the expected behavior of createRelation when
+   * data already exists.
+   * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
+   * Append mode means that when saving a DataFrame to a data source, if data 
already exists,
+   * contents of the DataFrame are expected to be appended to existing data.
+   * Overwrite mode means that when saving a DataFrame to a data source, if 
data already exists,
+   * existing data is expected to be overwritten by the contents of the 
DataFrame.
+   * ErrorIfExists mode means that when saving a DataFrame to a data source,
+   * if data already exists, an exception is expected to be thrown.
+   *
+   * @since 1.3.0
+   */
   def createRelation(
       sqlContext: SQLContext,
       mode: SaveMode,

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 854a662..d160f8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -286,8 +286,8 @@ abstract class QueryTest extends PlanTest {
   }
 
   /**
-    * Asserts that a given [[Dataset]] does not have missing inputs in all the 
analyzed plans.
-    */
+   * Asserts that a given [[Dataset]] does not have missing inputs in all the 
analyzed plans.
+   */
   def assertEmptyMissingInput(query: Dataset[_]): Unit = {
     assert(query.queryExecution.analyzed.missingInput.isEmpty,
       s"The analyzed logical plan has missing inputs: 
${query.queryExecution.analyzed}")

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 5590679..289e1b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -32,10 +32,10 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
 import org.apache.spark.util.Benchmark
 
 /**
-  * Benchmark to measure whole stage codegen performance.
-  * To run this:
-  *  build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
-  */
+ * Benchmark to measure whole stage codegen performance.
+ * To run this:
+ *  build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
+ */
 class BenchmarkWholeStageCodegen extends SparkFunSuite {
   lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
     .set("spark.sql.shuffle.partitions", "1")

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
index dc54883..aaeecef 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.csv
 import org.apache.spark.SparkFunSuite
 
 /**
-  * test cases for StringIteratorReader
-  */
+ * test cases for StringIteratorReader
+ */
 class CSVParserSuite extends SparkFunSuite {
 
   private def readAll(iter: Iterator[String]) = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index c1e151d..ac37e8e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -497,9 +497,10 @@ class StreamingContext private[streaming] (
     new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
   }
 
-  /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object 
for
-    * receiving system events related to streaming.
-    */
+  /**
+   * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object 
for
+   * receiving system events related to streaming.
+   */
   def addStreamingListener(streamingListener: StreamingListener) {
     scheduler.listenerBus.addListener(streamingListener)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 05f4da6..922e4a5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -517,9 +517,10 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
     ssc.remember(duration)
   }
 
-  /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object 
for
-    * receiving system events related to streaming.
-    */
+  /**
+   * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object 
for
+   * receiving system events related to streaming.
+   */
   def addStreamingListener(streamingListener: StreamingListener) {
     ssc.addStreamingListener(streamingListener)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index 0a861f2..fbac488 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -22,17 +22,18 @@ import com.google.common.util.concurrent.{RateLimiter => 
GuavaRateLimiter}
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 
-/** Provides waitToPush() method to limit the rate at which receivers consume 
data.
-  *
-  * waitToPush method will block the thread if too many messages have been 
pushed too quickly,
-  * and only return when a new message has been pushed. It assumes that only 
one message is
-  * pushed at a time.
-  *
-  * The spark configuration spark.streaming.receiver.maxRate gives the maximum 
number of messages
-  * per second that each receiver will accept.
-  *
-  * @param conf spark configuration
-  */
+/**
+ * Provides waitToPush() method to limit the rate at which receivers consume 
data.
+ *
+ * waitToPush method will block the thread if too many messages have been 
pushed too quickly,
+ * and only return when a new message has been pushed. It assumes that only 
one message is
+ * pushed at a time.
+ *
+ * The spark configuration spark.streaming.receiver.maxRate gives the maximum 
number of messages
+ * per second that each receiver will accept.
+ *
+ * @param conf spark configuration
+ */
 private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
 
   // treated as an upper limit

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 66d5ffb..0baedaf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -21,9 +21,10 @@ import scala.collection.mutable.HashSet
 
 import org.apache.spark.streaming.Time
 
-/** Class representing a set of Jobs
-  * belong to the same batch.
-  */
+/**
+ * Class representing a set of Jobs
+ * belong to the same batch.
+ */
 private[streaming]
 case class JobSet(
     time: Time,

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala 
b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
index 0df3c50..c9058ff 100644
--- a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
@@ -91,10 +91,11 @@ object GenerateMIMAIgnore {
     (ignoredClasses.flatMap(c => Seq(c, c.replace("$", "#"))).toSet, 
ignoredMembers.toSet)
   }
 
-  /** Scala reflection does not let us see inner function even if they are 
upgraded
-    * to public for some reason. So had to resort to java reflection to get 
all inner
-    * functions with $$ in there name.
-    */
+  /**
+   * Scala reflection does not let us see inner function even if they are 
upgraded
+   * to public for some reason. So had to resort to java reflection to get all 
inner
+   * functions with $$ in there name.
+   */
   def getInnerFunctions(classSymbol: unv.ClassSymbol): Seq[String] = {
     try {
       Class.forName(classSymbol.fullName, false, 
classLoader).getMethods.map(_.getName)

http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5af2c29..4b36da3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -135,8 +135,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
   }
 
   /**
-    * Obtains token for the Hive metastore and adds them to the credentials.
-    */
+   * Obtains token for the Hive metastore and adds them to the credentials.
+   */
   def obtainTokenForHiveMetastore(
       sparkConf: SparkConf,
       conf: Configuration,
@@ -149,8 +149,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
   }
 
   /**
-    * Obtain a security token for HBase.
-    */
+   * Obtain a security token for HBase.
+   */
   def obtainTokenForHBase(
       sparkConf: SparkConf,
       conf: Configuration,
@@ -164,10 +164,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
   }
 
   /**
-    * Return whether delegation tokens should be retrieved for the given 
service when security is
-    * enabled. By default, tokens are retrieved, but that behavior can be 
changed by setting
-    * a service-specific configuration.
-    */
+   * Return whether delegation tokens should be retrieved for the given 
service when security is
+   * enabled. By default, tokens are retrieved, but that behavior can be 
changed by setting
+   * a service-specific configuration.
+   */
   private def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
     conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to