Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e24b97650 -> 78f168e97


[SPARK-9615] [SPARK-9616] [SQL] [MLLIB] Bugs related to FrequentItems when 
merging and with Tungsten

In short:
1- FrequentItems should not use the InternalRow representation, because the 
keys in the map get messed up. For example, every key in the Map correspond to 
the very last element observed in the partition, when the elements are strings.

2- Merging two partitions had a bug:

**Existing behavior with size 3**
Partition A -> Map(1 -> 3, 2 -> 3, 3 -> 4)
Partition B -> Map(4 -> 25)
Result -> Map()

**Correct Behavior:**
Partition A -> Map(1 -> 3, 2 -> 3, 3 -> 4)
Partition B -> Map(4 -> 25)
Result -> Map(3 -> 1, 4 -> 22)

cc mengxr rxin JoshRosen

Author: Burak Yavuz <[email protected]>

Closes #7945 from brkyvz/freq-fix and squashes the following commits:

07fa001 [Burak Yavuz] address 2
1dc61a8 [Burak Yavuz] address 1
506753e [Burak Yavuz] fixed and added reg test
47bfd50 [Burak Yavuz] pushing

(cherry picked from commit 98e69467d4fda2c26a951409b5b7c6f1e9345ce4)
Signed-off-by: Xiangrui Meng <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78f168e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78f168e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78f168e9

Branch: refs/heads/branch-1.5
Commit: 78f168e97238316e33ce0d3763ba655603928c32
Parents: e24b976
Author: Burak Yavuz <[email protected]>
Authored: Thu Aug 6 10:29:40 2015 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Thu Aug 6 10:29:47 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/stat/FrequentItems.scala      | 26 +++++++++++---------
 .../apache/spark/sql/DataFrameStatSuite.scala   | 24 +++++++++++++++---
 2 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78f168e9/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index 9329148..db46302 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -20,17 +20,15 @@ package org.apache.spark.sql.execution.stat
 import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.{Row, Column, DataFrame}
 
 private[sql] object FrequentItems extends Logging {
 
   /** A helper class wrapping `MutableMap[Any, Long]` for simplicity. */
   private class FreqItemCounter(size: Int) extends Serializable {
     val baseMap: MutableMap[Any, Long] = MutableMap.empty[Any, Long]
-
     /**
      * Add a new example to the counts if it exists, otherwise deduct the count
      * from existing items.
@@ -42,9 +40,15 @@ private[sql] object FrequentItems extends Logging {
         if (baseMap.size < size) {
           baseMap += key -> count
         } else {
-          // TODO: Make this more efficient... A flatMap?
-          baseMap.retain((k, v) => v > count)
-          baseMap.transform((k, v) => v - count)
+          val minCount = baseMap.values.min
+          val remainder = count - minCount
+          if (remainder >= 0) {
+            baseMap += key -> count // something will get kicked out, so we 
can add this
+            baseMap.retain((k, v) => v > minCount)
+            baseMap.transform((k, v) => v - minCount)
+          } else {
+            baseMap.transform((k, v) => v - count)
+          }
         }
       }
       this
@@ -90,12 +94,12 @@ private[sql] object FrequentItems extends Logging {
       (name, originalSchema.fields(index).dataType)
     }.toArray
 
-    val freqItems = df.select(cols.map(Column(_)) : 
_*).queryExecution.toRdd.aggregate(countMaps)(
+    val freqItems = df.select(cols.map(Column(_)) : 
_*).rdd.aggregate(countMaps)(
       seqOp = (counts, row) => {
         var i = 0
         while (i < numCols) {
           val thisMap = counts(i)
-          val key = row.get(i, colInfo(i)._2)
+          val key = row.get(i)
           thisMap.add(key, 1L)
           i += 1
         }
@@ -110,13 +114,13 @@ private[sql] object FrequentItems extends Logging {
         baseCounts
       }
     )
-    val justItems = freqItems.map(m => m.baseMap.keys.toArray).map(new 
GenericArrayData(_))
-    val resultRow = InternalRow(justItems : _*)
+    val justItems = freqItems.map(m => m.baseMap.keys.toArray)
+    val resultRow = Row(justItems : _*)
     // append frequent Items to the column name for easy debugging
     val outputCols = colInfo.map { v =>
       StructField(v._1 + "_freqItems", ArrayType(v._2, false))
     }
     val schema = StructType(outputCols).toAttributes
-    new DataFrame(df.sqlContext, LocalRelation(schema, Seq(resultRow)))
+    new DataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, 
Seq(resultRow)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78f168e9/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 07a675e..0e7659f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -123,12 +123,30 @@ class DataFrameStatSuite extends QueryTest {
 
     val results = df.stat.freqItems(Array("numbers", "letters"), 0.1)
     val items = results.collect().head
-    items.getSeq[Int](0) should contain (1)
-    items.getSeq[String](1) should contain (toLetter(1))
+    assert(items.getSeq[Int](0).contains(1))
+    assert(items.getSeq[String](1).contains(toLetter(1)))
 
     val singleColResults = df.stat.freqItems(Array("negDoubles"), 0.1)
     val items2 = singleColResults.collect().head
-    items2.getSeq[Double](0) should contain (-1.0)
+    assert(items2.getSeq[Double](0).contains(-1.0))
+  }
+
+  test("Frequent Items 2") {
+    val rows = sqlCtx.sparkContext.parallelize(Seq.empty[Int], 4)
+    // this is a regression test, where when merging partitions, we omitted 
values with higher
+    // counts than those that existed in the map when the map was full. This 
test should also fail
+    // if anything like SPARK-9614 is observed once again
+    val df = rows.mapPartitionsWithIndex { (idx, iter) =>
+      if (idx == 3) { // must come from one of the later merges, therefore 
higher partition index
+        Iterator("3", "3", "3", "3", "3")
+      } else {
+        Iterator("0", "1", "2", "3", "4")
+      }
+    }.toDF("a")
+    val results = df.stat.freqItems(Array("a"), 0.25)
+    val items = results.collect().head.getSeq[String](0)
+    assert(items.contains("3"))
+    assert(items.length === 1)
   }
 
   test("sampleBy") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to