Repository: spark
Updated Branches:
  refs/heads/master 9d4e6212f -> ac2a26d09


[SPARK-16884] Move DataSourceScanExec out of ExistingRDD.scala file

## What changes were proposed in this pull request?

This moves DataSourceScanExec out so it's more discoverable, and now that it 
doesn't necessarily depend on an existing RDD.  cc davies

## How was this patch tested?

Existing tests.

Author: Eric Liang <e...@databricks.com>

Closes #14487 from ericl/split-scan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac2a26d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac2a26d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac2a26d0

Branch: refs/heads/master
Commit: ac2a26d09e10c3f462ec773c3ebaa6eedae81ac0
Parents: 9d4e621
Author: Eric Liang <e...@databricks.com>
Authored: Thu Aug 4 11:22:55 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Thu Aug 4 11:22:55 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 521 +++++++++++++++++++
 .../spark/sql/execution/ExistingRDD.scala       | 505 +-----------------
 .../datasources/DataSourceStrategy.scala        |   3 +-
 3 files changed, 525 insertions(+), 504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac2a26d0/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
new file mode 100644
index 0000000..1e749b3
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.Utils
+
+private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport 
{
+  val relation: BaseRelation
+  val metastoreTableIdentifier: Option[TableIdentifier]
+
+  override val nodeName: String = {
+    s"Scan $relation 
${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
+  }
+}
+
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class RowDataSourceScanExec(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val outputPartitioning: Partitioning,
+    override val metadata: Map[String, String],
+    override val metastoreTableIdentifier: Option[TableIdentifier])
+  extends DataSourceScanExec {
+
+  private[sql] override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
+
+  val outputUnsafeRows = relation match {
+    case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
+      !SparkSession.getActiveSession.get.sessionState.conf.getConf(
+        SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+    case _: HadoopFsRelation => true
+    case _ => false
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val unsafeRow = if (outputUnsafeRows) {
+      rdd
+    } else {
+      rdd.mapPartitionsInternal { iter =>
+        val proj = UnsafeProjection.create(schema)
+        iter.map(proj)
+      }
+    }
+
+    val numOutputRows = longMetric("numOutputRows")
+    unsafeRow.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+      key + ": " + StringUtils.abbreviate(value, 100)
+    }
+
+    s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
+      s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    rdd :: Nil
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    // PhysicalRDD always just has one input
+    val input = ctx.freshName("input")
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
+    val exprRows = output.zipWithIndex.map{ case (a, i) =>
+      new BoundReference(i, a.dataType, a.nullable)
+    }
+    val row = ctx.freshName("row")
+    ctx.INPUT_ROW = row
+    ctx.currentVars = null
+    val columnsRowInput = exprRows.map(_.genCode(ctx))
+    val inputRow = if (outputUnsafeRows) row else null
+    s"""
+       |while ($input.hasNext()) {
+       |  InternalRow $row = (InternalRow) $input.next();
+       |  $numOutputRows.add(1);
+       |  ${consume(ctx, columnsRowInput, inputRow).trim}
+       |  if (shouldStop()) return;
+       |}
+     """.stripMargin
+  }
+
+  // Ignore rdd when checking results
+  override def sameResult(plan: SparkPlan): Boolean = plan match {
+    case other: RowDataSourceScanExec => relation == other.relation && 
metadata == other.metadata
+    case _ => false
+  }
+}
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan.
+ * @param outputSchema Output schema of the scan.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param dataFilters Data source filters to use for filtering data within 
partitions.
+ * @param metastoreTableIdentifier
+ */
+private[sql] case class FileSourceScanExec(
+    @transient relation: HadoopFsRelation,
+    output: Seq[Attribute],
+    outputSchema: StructType,
+    partitionFilters: Seq[Expression],
+    dataFilters: Seq[Filter],
+    override val metastoreTableIdentifier: Option[TableIdentifier])
+  extends DataSourceScanExec {
+
+  val supportsBatch = relation.fileFormat.supportBatch(
+    relation.sparkSession, StructType.fromAttributes(output))
+
+  val needsUnsafeRowConversion = if 
(relation.fileFormat.isInstanceOf[ParquetSource]) {
+    
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
+  } else {
+    false
+  }
+
+  override val outputPartitioning: Partitioning = {
+    val bucketSpec = if 
(relation.sparkSession.sessionState.conf.bucketingEnabled) {
+      relation.bucketSpec
+    } else {
+      None
+    }
+    bucketSpec.map { spec =>
+      val numBuckets = spec.numBuckets
+      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
+        output.find(_.name == n)
+      }
+      if (bucketColumns.size == spec.bucketColumnNames.size) {
+        HashPartitioning(bucketColumns, numBuckets)
+      } else {
+        UnknownPartitioning(0)
+      }
+    }.getOrElse {
+      UnknownPartitioning(0)
+    }
+  }
+
+  // These metadata values make scan plans uniquely identifiable for equality 
checking.
+  override val metadata: Map[String, String] = Map(
+    "Format" -> relation.fileFormat.toString,
+    "ReadSchema" -> outputSchema.catalogString,
+    "Batched" -> supportsBatch.toString,
+    "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
+    "PushedFilters" -> dataFilters.mkString("[", ", ", "]"),
+    "InputPaths" -> relation.location.paths.mkString(", "))
+
+  private lazy val inputRDD: RDD[InternalRow] = {
+    val selectedPartitions = relation.location.listFiles(partitionFilters)
+
+    val readFile: (PartitionedFile) => Iterator[InternalRow] =
+      relation.fileFormat.buildReaderWithPartitionValues(
+        sparkSession = relation.sparkSession,
+        dataSchema = relation.dataSchema,
+        partitionSchema = relation.partitionSchema,
+        requiredSchema = outputSchema,
+        filters = dataFilters,
+        options = relation.options,
+        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+    relation.bucketSpec match {
+      case Some(bucketing) if 
relation.sparkSession.sessionState.conf.bucketingEnabled =>
+        createBucketedReadRDD(bucketing, readFile, selectedPartitions, 
relation)
+      case _ =>
+        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
+    }
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    inputRDD :: Nil
+  }
+
+  private[sql] override lazy val metrics =
+    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    if (supportsBatch) {
+      // in the case of fallback, this batched scan should never fail because 
of:
+      // 1) only primitive types are supported
+      // 2) the number of columns should be smaller than 
spark.sql.codegen.maxFields
+      WholeStageCodegenExec(this).execute()
+    } else {
+      val unsafeRows = {
+        val scan = inputRDD
+        if (needsUnsafeRowConversion) {
+          scan.mapPartitionsInternal { iter =>
+            val proj = UnsafeProjection.create(schema)
+            iter.map(proj)
+          }
+        } else {
+          scan
+        }
+      }
+      val numOutputRows = longMetric("numOutputRows")
+      unsafeRows.map { r =>
+        numOutputRows += 1
+        r
+      }
+    }
+  }
+
+  override def simpleString: String = {
+    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
+      key + ": " + StringUtils.abbreviate(value, 100)
+    }
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    if (supportsBatch) {
+      return doProduceVectorized(ctx)
+    }
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    // PhysicalRDD always just has one input
+    val input = ctx.freshName("input")
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
+    val exprRows = output.zipWithIndex.map{ case (a, i) =>
+      new BoundReference(i, a.dataType, a.nullable)
+    }
+    val row = ctx.freshName("row")
+    ctx.INPUT_ROW = row
+    ctx.currentVars = null
+    val columnsRowInput = exprRows.map(_.genCode(ctx))
+    val inputRow = if (needsUnsafeRowConversion) null else row
+    s"""
+       |while ($input.hasNext()) {
+       |  InternalRow $row = (InternalRow) $input.next();
+       |  $numOutputRows.add(1);
+       |  ${consume(ctx, columnsRowInput, inputRow).trim}
+       |  if (shouldStop()) return;
+       |}
+     """.stripMargin
+  }
+
+  // Support codegen so that we can avoid the UnsafeRow conversion in all 
cases. Codegen
+  // never requires UnsafeRow as input.
+  private def doProduceVectorized(ctx: CodegenContext): String = {
+    val input = ctx.freshName("input")
+    // PhysicalRDD always just has one input
+    ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
+
+    // metrics
+    val numOutputRows = metricTerm(ctx, "numOutputRows")
+    val scanTimeMetric = metricTerm(ctx, "scanTime")
+    val scanTimeTotalNs = ctx.freshName("scanTime")
+    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+    val columnarBatchClz = 
"org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+    val batch = ctx.freshName("batch")
+    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+    val columnVectorClz = 
"org.apache.spark.sql.execution.vectorized.ColumnVector"
+    val idx = ctx.freshName("batchIdx")
+    ctx.addMutableState("int", idx, s"$idx = 0;")
+    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
+    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
+      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
+      s"$name = $batch.column($i);"
+    }
+
+    val nextBatch = ctx.freshName("nextBatch")
+    ctx.addNewFunction(nextBatch,
+      s"""
+         |private void $nextBatch() throws java.io.IOException {
+         |  long getBatchStart = System.nanoTime();
+         |  if ($input.hasNext()) {
+         |    $batch = ($columnarBatchClz)$input.next();
+         |    $numOutputRows.add($batch.numRows());
+         |    $idx = 0;
+         |    ${columnAssigns.mkString("", "\n", "\n")}
+         |  }
+         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+         |}""".stripMargin)
+
+    ctx.currentVars = null
+    val rowidx = ctx.freshName("rowIdx")
+    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
+    }
+    s"""
+       |if ($batch == null) {
+       |  $nextBatch();
+       |}
+       |while ($batch != null) {
+       |  int numRows = $batch.numRows();
+       |  while ($idx < numRows) {
+       |    int $rowidx = $idx++;
+       |    ${consume(ctx, columnsBatchInput).trim}
+       |    if (shouldStop()) return;
+       |  }
+       |  $batch = null;
+       |  $nextBatch();
+       |}
+       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+       |$scanTimeTotalNs = 0;
+     """.stripMargin
+  }
+
+  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, 
ordinal: String,
+    dataType: DataType, nullable: Boolean): ExprCode = {
+    val javaType = ctx.javaType(dataType)
+    val value = ctx.getValue(columnVar, dataType, ordinal)
+    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
+    val valueVar = ctx.freshName("value")
+    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
+    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
+      s"""
+        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
+        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : 
($value);
+      """
+    } else {
+      s"$javaType ${valueVar} = $value;"
+    }).trim
+    ExprCode(code, isNullVar, valueVar)
+  }
+
+  /**
+   * Create an RDD for bucketed reads.
+   * The non-bucketed variant of this function is [[createNonBucketedReadRDD]].
+   *
+   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
+   * with the same bucket id from all the given Hive partitions.
+   *
+   * @param bucketSpec the bucketing spec.
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createBucketedReadRDD(
+      bucketSpec: BucketSpec,
+      readFile: (PartitionedFile) => Iterator[InternalRow],
+      selectedPartitions: Seq[Partition],
+      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
+    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
+    val bucketed =
+      selectedPartitions.flatMap { p =>
+        p.files.map { f =>
+          val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
+          PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, 
hosts)
+        }
+      }.groupBy { f =>
+        BucketingUtils
+          .getBucketId(new Path(f.filePath).getName)
+          .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
+      }
+
+    val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+      FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
+    }
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+  }
+
+  /**
+   * Create an RDD for non-bucketed reads.
+   * The bucketed variant of this function is [[createBucketedReadRDD]].
+   *
+   * @param readFile a function to read each (part of a) file.
+   * @param selectedPartitions Hive-style partition that are part of the read.
+   * @param fsRelation [[HadoopFsRelation]] associated with the read.
+   */
+  private def createNonBucketedReadRDD(
+      readFile: (PartitionedFile) => Iterator[InternalRow],
+      selectedPartitions: Seq[Partition],
+      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
+    val defaultMaxSplitBytes =
+      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
+    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
+    val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
+    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
+    val bytesPerCore = totalBytes / defaultParallelism
+
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
+      s"open cost is considered as scanning $openCostInBytes bytes.")
+
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      partition.files.flatMap { file =>
+        val blockLocations = getBlockLocations(file)
+        if (fsRelation.fileFormat.isSplitable(
+            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
+          (0L until file.getLen by maxSplitBytes).map { offset =>
+            val remaining = file.getLen - offset
+            val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
+            val hosts = getBlockHosts(blockLocations, offset, size)
+            PartitionedFile(
+              partition.values, file.getPath.toUri.toString, offset, size, 
hosts)
+          }
+        } else {
+          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
+          Seq(PartitionedFile(
+            partition.values, file.getPath.toUri.toString, 0, file.getLen, 
hosts))
+        }
+      }
+    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+
+    val partitions = new ArrayBuffer[FilePartition]
+    val currentFiles = new ArrayBuffer[PartitionedFile]
+    var currentSize = 0L
+
+    /** Close the current partition and move to the next. */
+    def closePartition(): Unit = {
+      if (currentFiles.nonEmpty) {
+        val newPartition =
+          FilePartition(
+            partitions.size,
+            currentFiles.toArray.toSeq) // Copy to a new Array.
+        partitions.append(newPartition)
+      }
+      currentFiles.clear()
+      currentSize = 0
+    }
+
+    // Assign files to partitions using "First Fit Decreasing" (FFD)
+    // TODO: consider adding a slop factor here?
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
+      }
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles.append(file)
+    }
+    closePartition()
+
+    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
+  }
+
+  private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file 
match {
+    case f: LocatedFileStatus => f.getBlockLocations
+    case f => Array.empty[BlockLocation]
+  }
+
+  // Given locations of all blocks of a single file, `blockLocations`, and an 
`(offset, length)`
+  // pair that represents a segment of the same file, find out the block that 
contains the largest
+  // fraction the segment, and returns location hosts of that block. If no 
such block can be found,
+  // returns an empty array.
+  private def getBlockHosts(
+      blockLocations: Array[BlockLocation], offset: Long, length: Long): 
Array[String] = {
+    val candidates = blockLocations.map {
+      // The fragment starts from a position within this block
+      case b if b.getOffset <= offset && offset < b.getOffset + b.getLength =>
+        b.getHosts -> (b.getOffset + b.getLength - offset).min(length)
+
+      // The fragment ends at a position within this block
+      case b if offset <= b.getOffset && offset + length < b.getLength =>
+        b.getHosts -> (offset + length - b.getOffset).min(length)
+
+      // The fragment fully contains this block
+      case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + 
length =>
+        b.getHosts -> b.getLength
+
+      // The fragment doesn't intersect with this block
+      case b =>
+        b.getHosts -> 0L
+    }.filter { case (hosts, size) =>
+      size > 0L
+    }
+
+    if (candidates.isEmpty) {
+      Array.empty[String]
+    } else {
+      val (hosts, _) = candidates.maxBy { case (_, size) => size }
+      hosts
+    }
+  }
+
+  override def sameResult(plan: SparkPlan): Boolean = plan match {
+    case other: FileSourceScanExec =>
+      val thisPredicates = partitionFilters.map(cleanExpression)
+      val otherPredicates = other.partitionFilters.map(cleanExpression)
+      val result = relation == other.relation && metadata == other.metadata &&
+        thisPredicates.length == otherPredicates.length &&
+        thisPredicates.zip(otherPredicates).forall(p => 
p._1.semanticEquals(p._2))
+      result
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ac2a26d0/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 79d9114..b762c16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -17,26 +17,15 @@
 
 package org.apache.spark.sql.execution
 
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
-
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Encoder, Row, SparkSession, 
SQLContext}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, 
TableIdentifier}
+import org.apache.spark.sql.{Encoder, Row, SparkSession}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.DataType
 import org.apache.spark.util.Utils
 
 object RDDConversions {
@@ -189,491 +178,3 @@ private[sql] case class RDDScanExec(
     s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}"
   }
 }
-
-private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport 
{
-  val relation: BaseRelation
-  val metastoreTableIdentifier: Option[TableIdentifier]
-
-  override val nodeName: String = {
-    s"Scan $relation 
${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
-  }
-}
-
-/** Physical plan node for scanning data from a relation. */
-private[sql] case class RowDataSourceScanExec(
-    output: Seq[Attribute],
-    rdd: RDD[InternalRow],
-    @transient relation: BaseRelation,
-    override val outputPartitioning: Partitioning,
-    override val metadata: Map[String, String],
-    override val metastoreTableIdentifier: Option[TableIdentifier])
-  extends DataSourceScanExec {
-
-  private[sql] override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"))
-
-  val outputUnsafeRows = relation match {
-    case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
-      !SparkSession.getActiveSession.get.sessionState.conf.getConf(
-        SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
-    case _: HadoopFsRelation => true
-    case _ => false
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val unsafeRow = if (outputUnsafeRows) {
-      rdd
-    } else {
-      rdd.mapPartitionsInternal { iter =>
-        val proj = UnsafeProjection.create(schema)
-        iter.map(proj)
-      }
-    }
-
-    val numOutputRows = longMetric("numOutputRows")
-    unsafeRow.map { r =>
-      numOutputRows += 1
-      r
-    }
-  }
-
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-
-    s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
-      s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    rdd :: Nil
-  }
-
-  override protected def doProduce(ctx: CodegenContext): String = {
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    // PhysicalRDD always just has one input
-    val input = ctx.freshName("input")
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
-    val exprRows = output.zipWithIndex.map{ case (a, i) =>
-      new BoundReference(i, a.dataType, a.nullable)
-    }
-    val row = ctx.freshName("row")
-    ctx.INPUT_ROW = row
-    ctx.currentVars = null
-    val columnsRowInput = exprRows.map(_.genCode(ctx))
-    val inputRow = if (outputUnsafeRows) row else null
-    s"""
-       |while ($input.hasNext()) {
-       |  InternalRow $row = (InternalRow) $input.next();
-       |  $numOutputRows.add(1);
-       |  ${consume(ctx, columnsRowInput, inputRow).trim}
-       |  if (shouldStop()) return;
-       |}
-     """.stripMargin
-  }
-
-  // Ignore rdd when checking results
-  override def sameResult(plan: SparkPlan): Boolean = plan match {
-    case other: RowDataSourceScanExec => relation == other.relation && 
metadata == other.metadata
-    case _ => false
-  }
-}
-
-/**
- * Physical plan node for scanning data from HadoopFsRelations.
- *
- * @param relation The file-based relation to scan.
- * @param output Output attributes of the scan.
- * @param outputSchema Output schema of the scan.
- * @param partitionFilters Predicates to use for partition pruning.
- * @param dataFilters Data source filters to use for filtering data within 
partitions.
- * @param metastoreTableIdentifier
- */
-private[sql] case class FileSourceScanExec(
-    @transient relation: HadoopFsRelation,
-    output: Seq[Attribute],
-    outputSchema: StructType,
-    partitionFilters: Seq[Expression],
-    dataFilters: Seq[Filter],
-    override val metastoreTableIdentifier: Option[TableIdentifier])
-  extends DataSourceScanExec {
-
-  val supportsBatch = relation.fileFormat.supportBatch(
-    relation.sparkSession, StructType.fromAttributes(output))
-
-  val needsUnsafeRowConversion = if 
(relation.fileFormat.isInstanceOf[ParquetSource]) {
-    
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
-  } else {
-    false
-  }
-
-  override val outputPartitioning: Partitioning = {
-    val bucketSpec = if 
(relation.sparkSession.sessionState.conf.bucketingEnabled) {
-      relation.bucketSpec
-    } else {
-      None
-    }
-    bucketSpec.map { spec =>
-      val numBuckets = spec.numBuckets
-      val bucketColumns = spec.bucketColumnNames.flatMap { n =>
-        output.find(_.name == n)
-      }
-      if (bucketColumns.size == spec.bucketColumnNames.size) {
-        HashPartitioning(bucketColumns, numBuckets)
-      } else {
-        UnknownPartitioning(0)
-      }
-    }.getOrElse {
-      UnknownPartitioning(0)
-    }
-  }
-
-  // These metadata values make scan plans uniquely identifiable for equality 
checking.
-  override val metadata: Map[String, String] = Map(
-    "Format" -> relation.fileFormat.toString,
-    "ReadSchema" -> outputSchema.catalogString,
-    "Batched" -> supportsBatch.toString,
-    "PartitionFilters" -> partitionFilters.mkString("[", ", ", "]"),
-    DataSourceScanExec.PUSHED_FILTERS -> dataFilters.mkString("[", ", ", "]"),
-    DataSourceScanExec.INPUT_PATHS -> relation.location.paths.mkString(", "))
-
-  private lazy val inputRDD: RDD[InternalRow] = {
-    val selectedPartitions = relation.location.listFiles(partitionFilters)
-
-    val readFile: (PartitionedFile) => Iterator[InternalRow] =
-      relation.fileFormat.buildReaderWithPartitionValues(
-        sparkSession = relation.sparkSession,
-        dataSchema = relation.dataSchema,
-        partitionSchema = relation.partitionSchema,
-        requiredSchema = outputSchema,
-        filters = dataFilters,
-        options = relation.options,
-        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
-
-    relation.bucketSpec match {
-      case Some(bucketing) if 
relation.sparkSession.sessionState.conf.bucketingEnabled =>
-        createBucketedReadRDD(bucketing, readFile, selectedPartitions, 
relation)
-      case _ =>
-        createNonBucketedReadRDD(readFile, selectedPartitions, relation)
-    }
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    inputRDD :: Nil
-  }
-
-  private[sql] override lazy val metrics =
-    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
-      "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    if (supportsBatch) {
-      // in the case of fallback, this batched scan should never fail because 
of:
-      // 1) only primitive types are supported
-      // 2) the number of columns should be smaller than 
spark.sql.codegen.maxFields
-      WholeStageCodegenExec(this).execute()
-    } else {
-      val unsafeRows = {
-        val scan = inputRDD
-        if (needsUnsafeRowConversion) {
-          scan.mapPartitionsInternal { iter =>
-            val proj = UnsafeProjection.create(schema)
-            iter.map(proj)
-          }
-        } else {
-          scan
-        }
-      }
-      val numOutputRows = longMetric("numOutputRows")
-      unsafeRows.map { r =>
-        numOutputRows += 1
-        r
-      }
-    }
-  }
-
-  override def simpleString: String = {
-    val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
-      key + ": " + StringUtils.abbreviate(value, 100)
-    }
-    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
-    s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
-  }
-
-  override protected def doProduce(ctx: CodegenContext): String = {
-    if (supportsBatch) {
-      return doProduceVectorized(ctx)
-    }
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    // PhysicalRDD always just has one input
-    val input = ctx.freshName("input")
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
-    val exprRows = output.zipWithIndex.map{ case (a, i) =>
-      new BoundReference(i, a.dataType, a.nullable)
-    }
-    val row = ctx.freshName("row")
-    ctx.INPUT_ROW = row
-    ctx.currentVars = null
-    val columnsRowInput = exprRows.map(_.genCode(ctx))
-    val inputRow = if (needsUnsafeRowConversion) null else row
-    s"""
-       |while ($input.hasNext()) {
-       |  InternalRow $row = (InternalRow) $input.next();
-       |  $numOutputRows.add(1);
-       |  ${consume(ctx, columnsRowInput, inputRow).trim}
-       |  if (shouldStop()) return;
-       |}
-     """.stripMargin
-  }
-
-  // Support codegen so that we can avoid the UnsafeRow conversion in all 
cases. Codegen
-  // never requires UnsafeRow as input.
-  private def doProduceVectorized(ctx: CodegenContext): String = {
-    val input = ctx.freshName("input")
-    // PhysicalRDD always just has one input
-    ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
-
-    // metrics
-    val numOutputRows = metricTerm(ctx, "numOutputRows")
-    val scanTimeMetric = metricTerm(ctx, "scanTime")
-    val scanTimeTotalNs = ctx.freshName("scanTime")
-    ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
-
-    val columnarBatchClz = 
"org.apache.spark.sql.execution.vectorized.ColumnarBatch"
-    val batch = ctx.freshName("batch")
-    ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
-
-    val columnVectorClz = 
"org.apache.spark.sql.execution.vectorized.ColumnVector"
-    val idx = ctx.freshName("batchIdx")
-    ctx.addMutableState("int", idx, s"$idx = 0;")
-    val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
-    val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
-      ctx.addMutableState(columnVectorClz, name, s"$name = null;")
-      s"$name = $batch.column($i);"
-    }
-
-    val nextBatch = ctx.freshName("nextBatch")
-    ctx.addNewFunction(nextBatch,
-      s"""
-         |private void $nextBatch() throws java.io.IOException {
-         |  long getBatchStart = System.nanoTime();
-         |  if ($input.hasNext()) {
-         |    $batch = ($columnarBatchClz)$input.next();
-         |    $numOutputRows.add($batch.numRows());
-         |    $idx = 0;
-         |    ${columnAssigns.mkString("", "\n", "\n")}
-         |  }
-         |  $scanTimeTotalNs += System.nanoTime() - getBatchStart;
-         |}""".stripMargin)
-
-    ctx.currentVars = null
-    val rowidx = ctx.freshName("rowIdx")
-    val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
-      genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
-    }
-    s"""
-       |if ($batch == null) {
-       |  $nextBatch();
-       |}
-       |while ($batch != null) {
-       |  int numRows = $batch.numRows();
-       |  while ($idx < numRows) {
-       |    int $rowidx = $idx++;
-       |    ${consume(ctx, columnsBatchInput).trim}
-       |    if (shouldStop()) return;
-       |  }
-       |  $batch = null;
-       |  $nextBatch();
-       |}
-       |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
-       |$scanTimeTotalNs = 0;
-     """.stripMargin
-  }
-
-  private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, 
ordinal: String,
-    dataType: DataType, nullable: Boolean): ExprCode = {
-    val javaType = ctx.javaType(dataType)
-    val value = ctx.getValue(columnVar, dataType, ordinal)
-    val isNullVar = if (nullable) { ctx.freshName("isNull") } else { "false" }
-    val valueVar = ctx.freshName("value")
-    val str = s"columnVector[$columnVar, $ordinal, ${dataType.simpleString}]"
-    val code = s"${ctx.registerComment(str)}\n" + (if (nullable) {
-      s"""
-        boolean ${isNullVar} = ${columnVar}.isNullAt($ordinal);
-        $javaType ${valueVar} = ${isNullVar} ? ${ctx.defaultValue(dataType)} : 
($value);
-      """
-    } else {
-      s"$javaType ${valueVar} = $value;"
-    }).trim
-    ExprCode(code, isNullVar, valueVar)
-  }
-
-  /**
-   * Create an RDD for bucketed reads.
-   * The non-bucketed variant of this function is [[createNonBucketedReadRDD]].
-   *
-   * The algorithm is pretty simple: each RDD partition being returned should 
include all the files
-   * with the same bucket id from all the given Hive partitions.
-   *
-   * @param bucketSpec the bucketing spec.
-   * @param readFile a function to read each (part of a) file.
-   * @param selectedPartitions Hive-style partition that are part of the read.
-   * @param fsRelation [[HadoopFsRelation]] associated with the read.
-   */
-  private def createBucketedReadRDD(
-      bucketSpec: BucketSpec,
-      readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Seq[Partition],
-      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
-    logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
-    val bucketed =
-      selectedPartitions.flatMap { p =>
-        p.files.map { f =>
-          val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
-          PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, 
hosts)
-        }
-      }.groupBy { f =>
-        BucketingUtils
-          .getBucketId(new Path(f.filePath).getName)
-          .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}"))
-      }
-
-    val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
-      FilePartition(bucketId, bucketed.getOrElse(bucketId, Nil))
-    }
-
-    new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
-  }
-
-  /**
-   * Create an RDD for non-bucketed reads.
-   * The bucketed variant of this function is [[createBucketedReadRDD]].
-   *
-   * @param readFile a function to read each (part of a) file.
-   * @param selectedPartitions Hive-style partition that are part of the read.
-   * @param fsRelation [[HadoopFsRelation]] associated with the read.
-   */
-  private def createNonBucketedReadRDD(
-      readFile: (PartitionedFile) => Iterator[InternalRow],
-      selectedPartitions: Seq[Partition],
-      fsRelation: HadoopFsRelation): RDD[InternalRow] = {
-    val defaultMaxSplitBytes =
-      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
-    val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
-    val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
-    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
-    val bytesPerCore = totalBytes / defaultParallelism
-
-    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
-    logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, 
" +
-      s"open cost is considered as scanning $openCostInBytes bytes.")
-
-    val splitFiles = selectedPartitions.flatMap { partition =>
-      partition.files.flatMap { file =>
-        val blockLocations = getBlockLocations(file)
-        if (fsRelation.fileFormat.isSplitable(
-            fsRelation.sparkSession, fsRelation.options, file.getPath)) {
-          (0L until file.getLen by maxSplitBytes).map { offset =>
-            val remaining = file.getLen - offset
-            val size = if (remaining > maxSplitBytes) maxSplitBytes else 
remaining
-            val hosts = getBlockHosts(blockLocations, offset, size)
-            PartitionedFile(
-              partition.values, file.getPath.toUri.toString, offset, size, 
hosts)
-          }
-        } else {
-          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
-          Seq(PartitionedFile(
-            partition.values, file.getPath.toUri.toString, 0, file.getLen, 
hosts))
-        }
-      }
-    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-
-    val partitions = new ArrayBuffer[FilePartition]
-    val currentFiles = new ArrayBuffer[PartitionedFile]
-    var currentSize = 0L
-
-    /** Close the current partition and move to the next. */
-    def closePartition(): Unit = {
-      if (currentFiles.nonEmpty) {
-        val newPartition =
-          FilePartition(
-            partitions.size,
-            currentFiles.toArray.toSeq) // Copy to a new Array.
-        partitions.append(newPartition)
-      }
-      currentFiles.clear()
-      currentSize = 0
-    }
-
-    // Assign files to partitions using "First Fit Decreasing" (FFD)
-    // TODO: consider adding a slop factor here?
-    splitFiles.foreach { file =>
-      if (currentSize + file.length > maxSplitBytes) {
-        closePartition()
-      }
-      // Add the given file to the current partition.
-      currentSize += file.length + openCostInBytes
-      currentFiles.append(file)
-    }
-    closePartition()
-
-    new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
-  }
-
-  private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file 
match {
-    case f: LocatedFileStatus => f.getBlockLocations
-    case f => Array.empty[BlockLocation]
-  }
-
-  // Given locations of all blocks of a single file, `blockLocations`, and an 
`(offset, length)`
-  // pair that represents a segment of the same file, find out the block that 
contains the largest
-  // fraction the segment, and returns location hosts of that block. If no 
such block can be found,
-  // returns an empty array.
-  private def getBlockHosts(
-      blockLocations: Array[BlockLocation], offset: Long, length: Long): 
Array[String] = {
-    val candidates = blockLocations.map {
-      // The fragment starts from a position within this block
-      case b if b.getOffset <= offset && offset < b.getOffset + b.getLength =>
-        b.getHosts -> (b.getOffset + b.getLength - offset).min(length)
-
-      // The fragment ends at a position within this block
-      case b if offset <= b.getOffset && offset + length < b.getLength =>
-        b.getHosts -> (offset + length - b.getOffset).min(length)
-
-      // The fragment fully contains this block
-      case b if offset <= b.getOffset && b.getOffset + b.getLength <= offset + 
length =>
-        b.getHosts -> b.getLength
-
-      // The fragment doesn't intersect with this block
-      case b =>
-        b.getHosts -> 0L
-    }.filter { case (hosts, size) =>
-      size > 0L
-    }
-
-    if (candidates.isEmpty) {
-      Array.empty[String]
-    } else {
-      val (hosts, _) = candidates.maxBy { case (_, size) => size }
-      hosts
-    }
-  }
-
-  override def sameResult(plan: SparkPlan): Boolean = plan match {
-    case other: FileSourceScanExec =>
-      val thisPredicates = partitionFilters.map(cleanExpression)
-      val otherPredicates = other.partitionFilters.map(cleanExpression)
-      val result = relation == other.relation && metadata == other.metadata &&
-        thisPredicates.length == otherPredicates.length &&
-        thisPredicates.zip(otherPredicates).forall(p => 
p._1.semanticEquals(p._2))
-      result
-    case _ => false
-  }
-}
-
-private[sql] object DataSourceScanExec {
-  // Metadata keys
-  val INPUT_PATHS = "InputPaths"
-  val PUSHED_FILTERS = "PushedFilters"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ac2a26d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 52b1677..ed8ccca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -34,7 +34,6 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
-import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS
 import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, 
DDLUtils, ExecutedCommandExec}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -361,7 +360,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         val markedFilters = for (filter <- pushedFilters) yield {
             if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
         }
-        pairs += (PUSHED_FILTERS -> markedFilters.mkString("[", ", ", "]"))
+        pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]"))
       }
       pairs.toMap
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to