Repository: spark Updated Branches: refs/heads/master 8eafa2aeb -> 66924ffa6
[SPARK-9527] [MLLIB] add PrefixSpanModel and make PrefixSpan Java friendly 1. Use `PrefixSpanModel` to wrap the frequent sequences. 2. Define `FreqSequence` to wrap each frequent sequence, which contains a Java-friendly method `javaSequence` 3. Overload `run` for Java users. 4. Added a unit test in Java to check Java compatibility. zhangjiajin feynmanliang Author: Xiangrui Meng <m...@databricks.com> Closes #7869 from mengxr/SPARK-9527 and squashes the following commits: 4345594 [Xiangrui Meng] add PrefixSpanModel and make PrefixSpan Java friendly Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66924ffa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66924ffa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66924ffa Branch: refs/heads/master Commit: 66924ffa6bdb8e0df1b90b789cb7ad443377e729 Parents: 8eafa2a Author: Xiangrui Meng <m...@databricks.com> Authored: Sun Aug 2 11:50:17 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Sun Aug 2 11:50:17 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 52 +++++++++++++-- .../spark/mllib/fpm/JavaPrefixSpanSuite.java | 67 ++++++++++++++++++++ .../spark/mllib/fpm/PrefixSpanSuite.scala | 8 +-- 3 files changed, 118 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/66924ffa/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index c1761c3..9eaf733 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -17,11 +17,16 @@ package org.apache.spark.mllib.fpm +import java.{lang => jl, util => ju} + +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuilder import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.annotation.Experimental +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -93,9 +98,9 @@ class PrefixSpan private ( /** * Find the complete set of sequential patterns in the input sequences of itemsets. * @param data ordered sequences of itemsets. - * @return (sequential itemset pattern, count) tuples + * @return a [[PrefixSpanModel]] that contains the frequent sequences */ - def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): RDD[(Array[Array[Item]], Long)] = { + def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): PrefixSpanModel[Item] = { val itemToInt = data.aggregate(Set[Item]())( seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet }, combOp = { _ ++ _ } @@ -113,9 +118,25 @@ class PrefixSpan private ( case (x, xs) => List(x.map(intToItem).toArray) } } - results.map { case (seq: Array[Int], count: Long) => - (toPublicRepr(seq).toArray, count) + val freqSequences = results.map { case (seq: Array[Int], count: Long) => + new FreqSequence[Item](toPublicRepr(seq).toArray, count) } + new PrefixSpanModel[Item](freqSequences) + } + + /** + * A Java-friendly version of [[run()]] that reads sequences from a [[JavaRDD]] and returns + * frequent sequences in a [[PrefixSpanModel]]. + * @param data ordered sequences of itemsets stored as Java Iterable of Iterables + * @tparam Item item type + * @tparam Itemset itemset type, which is an Iterable of Items + * @tparam Sequence sequence type, which is an Iterable of Itemsets + * @return a [[PrefixSpanModel]] that contains the frequent sequences + */ + def run[Item, Itemset <: jl.Iterable[Item], Sequence <: jl.Iterable[Itemset]]( + data: JavaRDD[Sequence]): PrefixSpanModel[Item] = { + implicit val tag = fakeClassTag[Item] + run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray)) } /** @@ -287,7 +308,7 @@ class PrefixSpan private ( } -private[fpm] object PrefixSpan { +object PrefixSpan { private[fpm] val DELIMITER = -1 /** Splits an array of itemsets delimited by [[DELIMITER]]. */ @@ -313,4 +334,25 @@ private[fpm] object PrefixSpan { // TODO: improve complexity by using partial prefixes, considering one item at a time itemSet.subsets.filter(_ != Set.empty[Int]) } + + /** + * Represents a frequence sequence. + * @param sequence a sequence of itemsets stored as an Array of Arrays + * @param freq frequency + * @tparam Item item type + */ + class FreqSequence[Item](val sequence: Array[Array[Item]], val freq: Long) extends Serializable { + /** + * Returns sequence as a Java List of lists for Java users. + */ + def javaSequence: ju.List[ju.List[Item]] = sequence.map(_.toList.asJava).toList.asJava + } } + +/** + * Model fitted by [[PrefixSpan]] + * @param freqSequences frequent sequences + * @tparam Item item type + */ +class PrefixSpanModel[Item](val freqSequences: RDD[PrefixSpan.FreqSequence[Item]]) + extends Serializable http://git-wip-us.apache.org/repos/asf/spark/blob/66924ffa/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java new file mode 100644 index 0000000..34daf5f --- /dev/null +++ b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaPrefixSpanSuite.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fpm; + +import java.util.Arrays; +import java.util.List; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.mllib.fpm.PrefixSpan.FreqSequence; + +public class JavaPrefixSpanSuite { + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaPrefixSpan"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void runPrefixSpan() { + JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList( + Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)), + Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)), + Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)), + Arrays.asList(Arrays.asList(6)) + ), 2); + PrefixSpan prefixSpan = new PrefixSpan() + .setMinSupport(0.5) + .setMaxPatternLength(5); + PrefixSpanModel<Integer> model = prefixSpan.run(sequences); + JavaRDD<FreqSequence<Integer>> freqSeqs = model.freqSequences().toJavaRDD(); + List<FreqSequence<Integer>> localFreqSeqs = freqSeqs.collect(); + Assert.assertEquals(5, localFreqSeqs.size()); + // Check that each frequent sequence could be materialized. + for (PrefixSpan.FreqSequence<Integer> freqSeq: localFreqSeqs) { + List<List<Integer>> seq = freqSeq.javaSequence(); + long freq = freqSeq.freq(); + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/66924ffa/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index d87f61e..0ae48d6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -296,7 +296,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { 5 <{1,2}> 0.75 */ - val result = prefixspan.run(rdd) + val model = prefixspan.run(rdd) val expected = Array( (Array(Array(1)), 3L), (Array(Array(2)), 3L), @@ -304,7 +304,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(Array(1), Array(3)), 2L), (Array(Array(1, 2)), 3L) ) - compareResults(expected, result.collect()) + compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq))) } test("PrefixSpan String type, variable-size itemsets") { @@ -322,7 +322,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { .setMinSupport(0.5) .setMaxPatternLength(5) - val result = prefixspan.run(rdd) + val model = prefixspan.run(rdd) val expected = Array( (Array(Array(1)), 3L), (Array(Array(2)), 3L), @@ -332,7 +332,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { ).map { case (pattern, count) => (pattern.map(itemSet => itemSet.map(intToString)), count) } - compareResults(expected, result.collect()) + compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq))) } private def compareResults[Item]( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org