Repository: spark
Updated Branches:
  refs/heads/master 57084e0c7 -> 28d944e86


[SPARK-9000] [MLLIB] Support generic item types in PrefixSpan

mengxr Please review after #7818 merges and master is rebased.

Continues work by rikima

Closes #7400

Author: Feynman Liang <fli...@databricks.com>
Author: masaki rikitoku <rikima3...@gmail.com>

Closes #7837 from feynmanliang/SPARK-7400-genericItems and squashes the 
following commits:

8b2c756 [Feynman Liang] Remove orig
92443c8 [Feynman Liang] Style fixes
42c6349 [Feynman Liang] Style fix
14e67fc [Feynman Liang] Generic prefixSpan itemtypes
b3b21e0 [Feynman Liang] Initial support for generic itemtype in public api
b86e0d5 [masaki rikitoku] modify to support generic item type


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28d944e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28d944e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28d944e8

Branch: refs/heads/master
Commit: 28d944e86d066eb4c651dd803f0b022605ed644e
Parents: 57084e0
Author: Feynman Liang <fli...@databricks.com>
Authored: Sat Aug 1 23:11:25 2015 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Sat Aug 1 23:11:25 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/fpm/PrefixSpan.scala |  40 ++++++-
 .../spark/mllib/fpm/PrefixSpanSuite.scala       | 104 +++++++++++++++++--
 2 files changed, 132 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28d944e8/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index 22b4ddb..c1761c3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.fpm
 
 import scala.collection.mutable.ArrayBuilder
+import scala.reflect.ClassTag
 
 import org.apache.spark.Logging
 import org.apache.spark.annotation.Experimental
@@ -90,15 +91,44 @@ class PrefixSpan private (
   }
 
   /**
-   * Find the complete set of sequential patterns in the input sequences.
+   * Find the complete set of sequential patterns in the input sequences of 
itemsets.
+   * @param data ordered sequences of itemsets.
+   * @return (sequential itemset pattern, count) tuples
+   */
+  def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): 
RDD[(Array[Array[Item]], Long)] = {
+    val itemToInt = data.aggregate(Set[Item]())(
+      seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
+      combOp = { _ ++ _ }
+    ).zipWithIndex.toMap
+    val intToItem = Map() ++ (itemToInt.map { case (k, v) => (v, k) })
+
+    val dataInternalRepr = data.map { seq =>
+      seq.map(itemset => itemset.map(itemToInt)).reduce((a, b) => a ++ 
(DELIMITER +: b))
+    }
+    val results = run(dataInternalRepr)
+
+    def toPublicRepr(pattern: Iterable[Int]): List[Array[Item]] = {
+      pattern.span(_ != DELIMITER) match {
+        case (x, xs) if xs.size > 1 => x.map(intToItem).toArray :: 
toPublicRepr(xs.tail)
+        case (x, xs) => List(x.map(intToItem).toArray)
+      }
+    }
+    results.map { case (seq: Array[Int], count: Long) =>
+      (toPublicRepr(seq).toArray, count)
+    }
+  }
+
+  /**
+   * Find the complete set of sequential patterns in the input sequences. This 
method utilizes
+   * the internal representation of itemsets as Array[Int] where each itemset 
is represented by
+   * a contiguous sequence of non-negative integers and delimiters represented 
by [[DELIMITER]].
    * @param data ordered sequences of itemsets. Items are represented by 
non-negative integers.
-   *                  Each itemset has one or more items and is delimited by 
[[DELIMITER]].
+   *             Each itemset has one or more items and is delimited by 
[[DELIMITER]].
    * @return a set of sequential pattern pairs,
    *         the key of pair is pattern (a list of elements),
    *         the value of pair is the pattern's count.
    */
-  // TODO: generalize to arbitrary item-types and use mapping to Ints for 
internal algorithm
-  def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+  private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
     val sc = data.sparkContext
 
     if (data.getStorageLevel == StorageLevel.NONE) {
@@ -260,7 +290,7 @@ class PrefixSpan private (
 private[fpm] object PrefixSpan {
   private[fpm] val DELIMITER = -1
 
-  /** Splits a sequence of itemsets delimited by [[DELIMITER]]. */
+  /** Splits an array of itemsets delimited by [[DELIMITER]]. */
   private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = {
     sequence.span(_ != DELIMITER) match {
       case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail)

http://git-wip-us.apache.org/repos/asf/spark/blob/28d944e8/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index 457f326..d87f61e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
 
 class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
 
-  test("PrefixSpan using Integer type, singleton itemsets") {
+  test("PrefixSpan internal (integer seq, -1 delim) run, singleton itemsets") {
 
     /*
       library("arulesSequences")
@@ -69,7 +69,7 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       (Array(4, -1, 5), 2L),
       (Array(5), 3L)
     )
-    compareResults(expectedValue1, result1.collect())
+    compareInternalResults(expectedValue1, result1.collect())
 
     prefixspan.setMinSupport(0.5).setMaxPatternLength(50)
     val result2 = prefixspan.run(rdd)
@@ -80,7 +80,7 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       (Array(4), 4L),
       (Array(5), 3L)
     )
-    compareResults(expectedValue2, result2.collect())
+    compareInternalResults(expectedValue2, result2.collect())
 
     prefixspan.setMinSupport(0.33).setMaxPatternLength(2)
     val result3 = prefixspan.run(rdd)
@@ -100,10 +100,10 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       (Array(4, -1, 5), 2L),
       (Array(5), 3L)
     )
-    compareResults(expectedValue3, result3.collect())
+    compareInternalResults(expectedValue3, result3.collect())
   }
 
-  test("PrefixSpan using Integer type, variable-size itemsets") {
+  test("PrefixSpan internal (integer seq, -1 delim) run, variable-size 
itemsets") {
     val sequences = Array(
       Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6),
       Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5),
@@ -254,10 +254,100 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       (Array(1, -1, 2, 3, -1, 1), 2L),
       (Array(1, -1, 2, -1, 1), 2L))
 
-    compareResults(expectedValue, result.collect())
+    compareInternalResults(expectedValue, result.collect())
   }
 
-  private def compareResults(
+  test("PrefixSpan Integer type, variable-size itemsets") {
+    val sequences = Seq(
+      Array(Array(1, 2), Array(3)),
+      Array(Array(1), Array(3, 2), Array(1, 2)),
+      Array(Array(1, 2), Array(5)),
+      Array(Array(6)))
+    val rdd = sc.parallelize(sequences, 2).cache()
+
+    val prefixspan = new PrefixSpan()
+      .setMinSupport(0.5)
+      .setMaxPatternLength(5)
+
+    /*
+      To verify results, create file "prefixSpanSeqs2" with content
+      (format = (transactionID, idxInTransaction, numItemsinItemset, itemset)):
+        1 1 2 1 2
+        1 2 1 3
+        2 1 1 1
+        2 2 2 3 2
+        2 3 2 1 2
+        3 1 2 1 2
+        3 2 1 5
+        4 1 1 6
+      In R, run:
+        library("arulesSequences")
+        prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = 
c("sequenceID","eventID","SIZE"))
+        freqItemSeq = cspade(prefixSpanSeqs,
+                             parameter = 0.5, maxlen = 5 ))
+        resSeq = as(freqItemSeq, "data.frame")
+        resSeq
+
+           sequence support
+        1     <{1}>    0.75
+        2     <{2}>    0.75
+        3     <{3}>    0.50
+        4 <{1},{3}>    0.50
+        5   <{1,2}>    0.75
+     */
+
+    val result = prefixspan.run(rdd)
+    val expected = Array(
+      (Array(Array(1)), 3L),
+      (Array(Array(2)), 3L),
+      (Array(Array(3)), 2L),
+      (Array(Array(1), Array(3)), 2L),
+      (Array(Array(1, 2)), 3L)
+    )
+    compareResults(expected, result.collect())
+  }
+
+  test("PrefixSpan String type, variable-size itemsets") {
+    // This is the same test as "PrefixSpan Int type, variable-size itemsets" 
except
+    // mapped to Strings
+    val intToString = (1 to 6).zip(Seq("a", "b", "c", "d", "e", "f")).toMap
+    val sequences = Seq(
+      Array(Array(1, 2), Array(3)),
+      Array(Array(1), Array(3, 2), Array(1, 2)),
+      Array(Array(1, 2), Array(5)),
+      Array(Array(6))).map(seq => seq.map(itemSet => itemSet.map(intToString)))
+    val rdd = sc.parallelize(sequences, 2).cache()
+
+    val prefixspan = new PrefixSpan()
+      .setMinSupport(0.5)
+      .setMaxPatternLength(5)
+
+    val result = prefixspan.run(rdd)
+    val expected = Array(
+      (Array(Array(1)), 3L),
+      (Array(Array(2)), 3L),
+      (Array(Array(3)), 2L),
+      (Array(Array(1), Array(3)), 2L),
+      (Array(Array(1, 2)), 3L)
+    ).map { case (pattern, count) =>
+      (pattern.map(itemSet => itemSet.map(intToString)), count)
+    }
+    compareResults(expected, result.collect())
+  }
+
+  private def compareResults[Item](
+      expectedValue: Array[(Array[Array[Item]], Long)],
+      actualValue: Array[(Array[Array[Item]], Long)]): Unit = {
+    val expectedSet = expectedValue.map { case (pattern: Array[Array[Item]], 
count: Long) =>
+      (pattern.map(itemSet => itemSet.toSet).toSeq, count)
+    }.toSet
+    val actualSet = actualValue.map { case (pattern: Array[Array[Item]], 
count: Long) =>
+      (pattern.map(itemSet => itemSet.toSet).toSeq, count)
+    }.toSet
+    assert(expectedSet === actualSet)
+  }
+
+  private def compareInternalResults(
       expectedValue: Array[(Array[Int], Long)],
       actualValue: Array[(Array[Int], Long)]): Unit = {
     val expectedSet = expectedValue.map(x => (x._1.toSeq, x._2)).toSet


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to