Repository: spark
Updated Branches:
  refs/heads/master 8eafa2aeb -> 66924ffa6


[SPARK-9527] [MLLIB] add PrefixSpanModel and make PrefixSpan Java friendly

1. Use `PrefixSpanModel` to wrap the frequent sequences.
2. Define `FreqSequence` to wrap each frequent sequence, which contains a 
Java-friendly method `javaSequence`
3. Overload `run` for Java users.
4. Added a unit test in Java to check Java compatibility.

zhangjiajin feynmanliang

Author: Xiangrui Meng <m...@databricks.com>

Closes #7869 from mengxr/SPARK-9527 and squashes the following commits:

4345594 [Xiangrui Meng] add PrefixSpanModel and make PrefixSpan Java friendly


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66924ffa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66924ffa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66924ffa

Branch: refs/heads/master
Commit: 66924ffa6bdb8e0df1b90b789cb7ad443377e729
Parents: 8eafa2a
Author: Xiangrui Meng <m...@databricks.com>
Authored: Sun Aug 2 11:50:17 2015 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Sun Aug 2 11:50:17 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 52 +++++++++++++--
 .../spark/mllib/fpm/JavaPrefixSpanSuite.java    | 67 ++++++++++++++++++++
 .../spark/mllib/fpm/PrefixSpanSuite.scala       |  8 +--
 3 files changed, 118 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/66924ffa/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index c1761c3..9eaf733 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -17,11 +17,16 @@
 
 package org.apache.spark.mllib.fpm
 
+import java.{lang => jl, util => ju}
+
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuilder
 import scala.reflect.ClassTag
 
 import org.apache.spark.Logging
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -93,9 +98,9 @@ class PrefixSpan private (
   /**
    * Find the complete set of sequential patterns in the input sequences of 
itemsets.
    * @param data ordered sequences of itemsets.
-   * @return (sequential itemset pattern, count) tuples
+   * @return a [[PrefixSpanModel]] that contains the frequent sequences
    */
-  def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): 
RDD[(Array[Array[Item]], Long)] = {
+  def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): 
PrefixSpanModel[Item] = {
     val itemToInt = data.aggregate(Set[Item]())(
       seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
       combOp = { _ ++ _ }
@@ -113,9 +118,25 @@ class PrefixSpan private (
         case (x, xs) => List(x.map(intToItem).toArray)
       }
     }
-    results.map { case (seq: Array[Int], count: Long) =>
-      (toPublicRepr(seq).toArray, count)
+    val freqSequences = results.map { case (seq: Array[Int], count: Long) =>
+      new FreqSequence[Item](toPublicRepr(seq).toArray, count)
     }
+    new PrefixSpanModel[Item](freqSequences)
+  }
+
+  /**
+   * A Java-friendly version of [[run()]] that reads sequences from a 
[[JavaRDD]] and returns
+   * frequent sequences in a [[PrefixSpanModel]].
+   * @param data ordered sequences of itemsets stored as Java Iterable of 
Iterables
+   * @tparam Item item type
+   * @tparam Itemset itemset type, which is an Iterable of Items
+   * @tparam Sequence sequence type, which is an Iterable of Itemsets
+   * @return a [[PrefixSpanModel]] that contains the frequent sequences
+   */
+  def run[Item, Itemset <: jl.Iterable[Item], Sequence <: 
jl.Iterable[Itemset]](
+      data: JavaRDD[Sequence]): PrefixSpanModel[Item] = {
+    implicit val tag = fakeClassTag[Item]
+    run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray))
   }
 
   /**
@@ -287,7 +308,7 @@ class PrefixSpan private (
 
 }
 
-private[fpm] object PrefixSpan {
+object PrefixSpan {
   private[fpm] val DELIMITER = -1
 
   /** Splits an array of itemsets delimited by [[DELIMITER]]. */
@@ -313,4 +334,25 @@ private[fpm] object PrefixSpan {
     // TODO: improve complexity by using partial prefixes, considering one 
item at a time
     itemSet.subsets.filter(_ != Set.empty[Int])
   }
+
+  /**
+   * Represents a frequence sequence.
+   * @param sequence a sequence of itemsets stored as an Array of Arrays
+   * @param freq frequency
+   * @tparam Item item type
+   */
+  class FreqSequence[Item](val sequence: Array[Array[Item]], val freq: Long) 
extends Serializable {
+    /**
+     * Returns sequence as a Java List of lists for Java users.
+     */
+    def javaSequence: ju.List[ju.List[Item]] = 
sequence.map(_.toList.asJava).toList.asJava
+  }
 }
+
+/**
+ * Model fitted by [[PrefixSpan]]
+ * @param freqSequences frequent sequences
+ * @tparam Item item type
+ */
+class PrefixSpanModel[Item](val freqSequences: 
RDD[PrefixSpan.FreqSequence[Item]])
+  extends Serializable

http://git-wip-us.apache.org/repos/asf/spark/blob/66924ffa/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java 
b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java
new file mode 100644
index 0000000..34daf5f
--- /dev/null
+++ b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.fpm;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.fpm.PrefixSpan.FreqSequence;
+
+public class JavaPrefixSpanSuite {
+  private transient JavaSparkContext sc;
+
+  @Before
+  public void setUp() {
+    sc = new JavaSparkContext("local", "JavaPrefixSpan");
+  }
+
+  @After
+  public void tearDown() {
+    sc.stop();
+    sc = null;
+  }
+
+  @Test
+  public void runPrefixSpan() {
+    JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
+      Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
+      Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 
2)),
+      Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
+      Arrays.asList(Arrays.asList(6))
+    ), 2);
+    PrefixSpan prefixSpan = new PrefixSpan()
+      .setMinSupport(0.5)
+      .setMaxPatternLength(5);
+    PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
+    JavaRDD<FreqSequence<Integer>> freqSeqs = 
model.freqSequences().toJavaRDD();
+    List<FreqSequence<Integer>> localFreqSeqs = freqSeqs.collect();
+    Assert.assertEquals(5, localFreqSeqs.size());
+    // Check that each frequent sequence could be materialized.
+    for (PrefixSpan.FreqSequence<Integer> freqSeq: localFreqSeqs) {
+      List<List<Integer>> seq = freqSeq.javaSequence();
+      long freq = freqSeq.freq();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/66924ffa/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index d87f61e..0ae48d6 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -296,7 +296,7 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
         5   <{1,2}>    0.75
      */
 
-    val result = prefixspan.run(rdd)
+    val model = prefixspan.run(rdd)
     val expected = Array(
       (Array(Array(1)), 3L),
       (Array(Array(2)), 3L),
@@ -304,7 +304,7 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       (Array(Array(1), Array(3)), 2L),
       (Array(Array(1, 2)), 3L)
     )
-    compareResults(expected, result.collect())
+    compareResults(expected, model.freqSequences.collect().map(x => 
(x.sequence, x.freq)))
   }
 
   test("PrefixSpan String type, variable-size itemsets") {
@@ -322,7 +322,7 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       .setMinSupport(0.5)
       .setMaxPatternLength(5)
 
-    val result = prefixspan.run(rdd)
+    val model = prefixspan.run(rdd)
     val expected = Array(
       (Array(Array(1)), 3L),
       (Array(Array(2)), 3L),
@@ -332,7 +332,7 @@ class PrefixSpanSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     ).map { case (pattern, count) =>
       (pattern.map(itemSet => itemSet.map(intToString)), count)
     }
-    compareResults(expected, result.collect())
+    compareResults(expected, model.freqSequences.collect().map(x => 
(x.sequence, x.freq)))
   }
 
   private def compareResults[Item](


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to