Repository: spark
Updated Branches:
  refs/heads/master 5c92d47ad -> e7690ed20


SPARK-2811 upgrade algebird to 0.8.1

Author: Adam Pingel <[email protected]>

Closes #3282 from adampingel/master and squashes the following commits:

70c8d3c [Adam Pingel] relocate the algebird example back to example/src
7a9d8be [Adam Pingel] SPARK-2811 upgrade algebird to 0.8.1


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7690ed2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7690ed2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7690ed2

Branch: refs/heads/master
Commit: e7690ed20a2734b7ca88e78a60a8e75ba19e9d8b
Parents: 5c92d47
Author: Adam Pingel <[email protected]>
Authored: Mon Nov 17 10:47:29 2014 -0800
Committer: Patrick Wendell <[email protected]>
Committed: Mon Nov 17 10:47:29 2014 -0800

----------------------------------------------------------------------
 examples/pom.xml                                |  14 +--
 .../examples/streaming/TwitterAlgebirdCMS.scala | 114 -------------------
 .../examples/streaming/TwitterAlgebirdHLL.scala |  92 ---------------
 .../examples/streaming/TwitterAlgebirdCMS.scala | 114 +++++++++++++++++++
 .../examples/streaming/TwitterAlgebirdHLL.scala |  92 +++++++++++++++
 5 files changed, 213 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e7690ed2/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 2752ce3..85e1337 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -218,6 +218,11 @@
       <artifactId>commons-math3</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>algebird-core_${scala.binary.version}</artifactId>
+      <version>0.8.1</version>
+    </dependency>
+    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>
@@ -389,8 +394,8 @@
       </properties>
     </profile>
     <profile>
-      <!-- We add a source directory specific to Scala 2.10 since Kafka and 
Algebird
-           only work with it -->
+      <!-- We add a source directory specific to Scala 2.10 since Kafka 
+           only works with it -->
       <id>scala-2.10</id>
       <activation>
         <property><name>!scala-2.11</name></property>
@@ -401,11 +406,6 @@
           
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
           <version>${project.version}</version>
         </dependency>
-        <dependency>
-          <groupId>com.twitter</groupId>
-          <artifactId>algebird-core_${scala.binary.version}</artifactId>
-          <version>0.1.11</version>
-        </dependency>
       </dependencies>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/spark/blob/e7690ed2/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
deleted file mode 100644
index 683752a..0000000
--- 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird._
-
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird 
library, to compute
- * windowed and global Top-K estimates of user IDs occurring in a Twitter 
stream.
- * <br>
- *   <strong>Note</strong> that since Algebird's implementation currently only 
supports Long inputs,
- *   the example operates on Long IDs. Once the implementation supports other 
inputs (such as String),
- *   the same approach could be used for computing popular topics for example.
- * <p>
- * <p>
- *   <a href=
- *   
"http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
- *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The 
CMS is a data
- *   structure for approximate frequency estimation in data streams (e.g. 
Top-K elements, frequency
- *   of any given element, etc), that uses space sub-linear in the number of 
elements in the
- *   stream. Once elements are added to the CMS, the estimated count of an 
element can be computed,
- *   as well as "heavy-hitters" that occur more than a threshold percentage of 
the overall total
- *   count.
- * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two CMS 
instances in the
- *   reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdCMS {
-  def main(args: Array[String]) {
-    StreamingExamples.setStreamingLogLevels()
-
-    // CMS parameters
-    val DELTA = 1E-3
-    val EPS = 0.01
-    val SEED = 1
-    val PERC = 0.001
-    // K highest frequency elements to take
-    val TOPK = 10
-
-    val filters = args
-    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
-    val ssc = new StreamingContext(sparkConf, Seconds(10))
-    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER_2)
-
-    val users = stream.map(status => status.getUser.getId)
-
-    val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
-    var globalCMS = cms.zero
-    val mm = new MapMonoid[Long, Int]()
-    var globalExact = Map[Long, Int]()
-
-    val approxTopUsers = users.mapPartitions(ids => {
-      ids.map(id => cms.create(id))
-    }).reduce(_ ++ _)
-
-    val exactTopUsers = users.map(id => (id, 1))
-      .reduceByKey((a, b) => a + b)
-
-    approxTopUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        val partialTopK = partial.heavyHitters.map(id =>
-          (id, 
partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        globalCMS ++= partial
-        val globalTopK = globalCMS.heavyHitters.map(id =>
-          (id, 
globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        println("Approx heavy hitters at %2.2f%% threshold this batch: 
%s".format(PERC,
-          partialTopK.mkString("[", ",", "]")))
-        println("Approx heavy hitters at %2.2f%% threshold overall: 
%s".format(PERC,
-          globalTopK.mkString("[", ",", "]")))
-      }
-    })
-
-    exactTopUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partialMap = rdd.collect().toMap
-        val partialTopK = rdd.map(
-          {case (id, count) => (count, id)})
-          .sortByKey(ascending = false).take(TOPK)
-        globalExact = mm.plus(globalExact.toMap, partialMap)
-        val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        println("Exact heavy hitters this batch: 
%s".format(partialTopK.mkString("[", ",", "]")))
-        println("Exact heavy hitters overall: 
%s".format(globalTopK.mkString("[", ",", "]")))
-      }
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7690ed2/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
deleted file mode 100644
index 62db5e6..0000000
--- 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird.HyperLogLogMonoid
-import com.twitter.algebird.HyperLogLog._
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-import org.apache.spark.SparkConf
-
-// scalastyle:off
-/**
- * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird 
library, to compute
- * a windowed and global estimate of the unique user IDs occurring in a 
Twitter stream.
- * <p>
- * <p>
- *   This <a 
href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
- *   blog post</a> and this
- *   <a href= 
"http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html";>
- *     blog post</a>
- *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient 
datastructure for
- *   estimating the cardinality of a data stream, i.e. the number of unique 
elements.
- * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two HLL 
instances in the
- *   reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdHLL {
-  def main(args: Array[String]) {
-
-    StreamingExamples.setStreamingLogLevels()
-
-    /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
-    val BIT_SIZE = 12
-    val filters = args
-    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
-    val ssc = new StreamingContext(sparkConf, Seconds(5))
-    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER)
-
-    val users = stream.map(status => status.getUser.getId)
-
-    val hll = new HyperLogLogMonoid(BIT_SIZE)
-    var globalHll = hll.zero
-    var userSet: Set[Long] = Set()
-
-    val approxUsers = users.mapPartitions(ids => {
-      ids.map(id => hll(id))
-    }).reduce(_ + _)
-
-    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
-
-    approxUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        globalHll += partial
-        println("Approx distinct users this batch: 
%d".format(partial.estimatedSize.toInt))
-        println("Approx distinct users overall: 
%d".format(globalHll.estimatedSize.toInt))
-      }
-    })
-
-    exactUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        userSet ++= partial
-        println("Exact distinct users this batch: %d".format(partial.size))
-        println("Exact distinct users overall: %d".format(userSet.size))
-        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / 
userSet.size.toDouble) - 1
-          ) * 100))
-      }
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7690ed2/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
new file mode 100644
index 0000000..683752a
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.twitter._
+
+// scalastyle:off
+/**
+ * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird 
library, to compute
+ * windowed and global Top-K estimates of user IDs occurring in a Twitter 
stream.
+ * <br>
+ *   <strong>Note</strong> that since Algebird's implementation currently only 
supports Long inputs,
+ *   the example operates on Long IDs. Once the implementation supports other 
inputs (such as String),
+ *   the same approach could be used for computing popular topics for example.
+ * <p>
+ * <p>
+ *   <a href=
+ *   
"http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
+ *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The 
CMS is a data
+ *   structure for approximate frequency estimation in data streams (e.g. 
Top-K elements, frequency
+ *   of any given element, etc), that uses space sub-linear in the number of 
elements in the
+ *   stream. Once elements are added to the CMS, the estimated count of an 
element can be computed,
+ *   as well as "heavy-hitters" that occur more than a threshold percentage of 
the overall total
+ *   count.
+ * <p><p>
+ *   Algebird's implementation is a monoid, so we can succinctly merge two CMS 
instances in the
+ *   reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdCMS {
+  def main(args: Array[String]) {
+    StreamingExamples.setStreamingLogLevels()
+
+    // CMS parameters
+    val DELTA = 1E-3
+    val EPS = 0.01
+    val SEED = 1
+    val PERC = 0.001
+    // K highest frequency elements to take
+    val TOPK = 10
+
+    val filters = args
+    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
+    val ssc = new StreamingContext(sparkConf, Seconds(10))
+    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER_2)
+
+    val users = stream.map(status => status.getUser.getId)
+
+    val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
+    var globalCMS = cms.zero
+    val mm = new MapMonoid[Long, Int]()
+    var globalExact = Map[Long, Int]()
+
+    val approxTopUsers = users.mapPartitions(ids => {
+      ids.map(id => cms.create(id))
+    }).reduce(_ ++ _)
+
+    val exactTopUsers = users.map(id => (id, 1))
+      .reduceByKey((a, b) => a + b)
+
+    approxTopUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        val partialTopK = partial.heavyHitters.map(id =>
+          (id, 
partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        globalCMS ++= partial
+        val globalTopK = globalCMS.heavyHitters.map(id =>
+          (id, 
globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        println("Approx heavy hitters at %2.2f%% threshold this batch: 
%s".format(PERC,
+          partialTopK.mkString("[", ",", "]")))
+        println("Approx heavy hitters at %2.2f%% threshold overall: 
%s".format(PERC,
+          globalTopK.mkString("[", ",", "]")))
+      }
+    })
+
+    exactTopUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partialMap = rdd.collect().toMap
+        val partialTopK = rdd.map(
+          {case (id, count) => (count, id)})
+          .sortByKey(ascending = false).take(TOPK)
+        globalExact = mm.plus(globalExact.toMap, partialMap)
+        val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        println("Exact heavy hitters this batch: 
%s".format(partialTopK.mkString("[", ",", "]")))
+        println("Exact heavy hitters overall: 
%s".format(globalTopK.mkString("[", ",", "]")))
+      }
+    })
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7690ed2/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
new file mode 100644
index 0000000..62db5e6
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.twitter._
+import org.apache.spark.SparkConf
+
+// scalastyle:off
+/**
+ * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird 
library, to compute
+ * a windowed and global estimate of the unique user IDs occurring in a 
Twitter stream.
+ * <p>
+ * <p>
+ *   This <a 
href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
+ *   blog post</a> and this
+ *   <a href= 
"http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html";>
+ *     blog post</a>
+ *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient 
datastructure for
+ *   estimating the cardinality of a data stream, i.e. the number of unique 
elements.
+ * <p><p>
+ *   Algebird's implementation is a monoid, so we can succinctly merge two HLL 
instances in the
+ *   reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdHLL {
+  def main(args: Array[String]) {
+
+    StreamingExamples.setStreamingLogLevels()
+
+    /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
+    val BIT_SIZE = 12
+    val filters = args
+    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
+    val ssc = new StreamingContext(sparkConf, Seconds(5))
+    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER)
+
+    val users = stream.map(status => status.getUser.getId)
+
+    val hll = new HyperLogLogMonoid(BIT_SIZE)
+    var globalHll = hll.zero
+    var userSet: Set[Long] = Set()
+
+    val approxUsers = users.mapPartitions(ids => {
+      ids.map(id => hll(id))
+    }).reduce(_ + _)
+
+    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+    approxUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        globalHll += partial
+        println("Approx distinct users this batch: 
%d".format(partial.estimatedSize.toInt))
+        println("Approx distinct users overall: 
%d".format(globalHll.estimatedSize.toInt))
+      }
+    })
+
+    exactUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        userSet ++= partial
+        println("Exact distinct users this batch: %d".format(partial.size))
+        println("Exact distinct users overall: %d".format(userSet.size))
+        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / 
userSet.size.toDouble) - 1
+          ) * 100))
+      }
+    })
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to