Repository: spark Updated Branches: refs/heads/master 917d3fc06 -> ac56cf605
[SPARK-12604][CORE] Java count(AprroxDistinct)ByKey methods return Scala Long not Java Change Java countByKey, countApproxDistinctByKey return types to use Java Long, not Scala; update similar methods for consistency on java.long.Long.valueOf with no API change Author: Sean Owen <[email protected]> Closes #10554 from srowen/SPARK-12604. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac56cf60 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac56cf60 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac56cf60 Branch: refs/heads/master Commit: ac56cf605b61803c26e0004b43c703cca7e02d61 Parents: 917d3fc Author: Sean Owen <[email protected]> Authored: Wed Jan 6 17:17:32 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Wed Jan 6 17:17:32 2016 -0800 ---------------------------------------------------------------------- .../org/apache/spark/api/java/JavaPairRDD.scala | 32 ++++++++++++-------- .../org/apache/spark/api/java/JavaRDDLike.scala | 16 +++++----- .../java/org/apache/spark/JavaAPISuite.java | 18 +++++------ .../streaming/api/java/JavaDStreamLike.scala | 18 +++++------ .../streaming/api/java/JavaPairDStream.scala | 7 +++-- 5 files changed, 49 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 91dc186..76752e1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -17,8 +17,9 @@ package org.apache.spark.api.java +import java.{lang => jl} import java.lang.{Iterable => JIterable} -import java.util.{Comparator, List => JList, Map => JMap} +import java.util.{Comparator, List => JList} import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -139,7 +140,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * math.ceil(numItems * samplingRate) over all key values. */ def sampleByKey(withReplacement: Boolean, - fractions: JMap[K, Double], + fractions: java.util.Map[K, Double], seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed)) @@ -154,7 +155,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Use Utils.random.nextLong as the default seed for the random number generator. */ def sampleByKey(withReplacement: Boolean, - fractions: JMap[K, Double]): JavaPairRDD[K, V] = + fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] = sampleByKey(withReplacement, fractions, Utils.random.nextLong) /** @@ -168,7 +169,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * two additional passes. */ def sampleByKeyExact(withReplacement: Boolean, - fractions: JMap[K, Double], + fractions: java.util.Map[K, Double], seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed)) @@ -184,7 +185,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * Use Utils.random.nextLong as the default seed for the random number generator. */ - def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] = + def sampleByKeyExact( + withReplacement: Boolean, + fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] = sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong) /** @@ -292,7 +295,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func)) /** Count the number of elements for each key, and return the result to the master as a Map. */ - def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey()) + def countByKey(): java.util.Map[K, jl.Long] = + mapAsSerializableJavaMap(rdd.countByKey().mapValues(jl.Long.valueOf)) /** * Approximate version of countByKey that can return a partial result if it does @@ -934,9 +938,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = - { - fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner) + : JavaPairRDD[K, jl.Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)). + asInstanceOf[JavaPairRDD[K, jl.Long]] } /** @@ -950,8 +955,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, jl.Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)). + asInstanceOf[JavaPairRDD[K, jl.Long]] } /** @@ -964,8 +970,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(relativeSD)) + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, jl.Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)).asInstanceOf[JavaPairRDD[K, jl.Long]] } /** Assign a name to this RDD */ http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6d3485d..1b1a9dc 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.java import java.{lang => jl} -import java.lang.{Iterable => JIterable, Long => JLong} +import java.lang.{Iterable => JIterable} import java.util.{Comparator, Iterator => JIterator, List => JList} import scala.collection.JavaConverters._ @@ -305,8 +305,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. */ - def zipWithUniqueId(): JavaPairRDD[T, JLong] = { - JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]] + def zipWithUniqueId(): JavaPairRDD[T, jl.Long] = { + JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, jl.Long]] } /** @@ -316,8 +316,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. * This method needs to trigger a spark job when this RDD contains more than one partitions. */ - def zipWithIndex(): JavaPairRDD[T, JLong] = { - JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]] + def zipWithIndex(): JavaPairRDD[T, jl.Long] = { + JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, jl.Long]] } // Actions (launch a job to return a value to the user program) @@ -448,7 +448,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * combine step happens locally on the master, equivalent to running a single reduce task. */ def countByValue(): java.util.Map[T, jl.Long] = - mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2))))) + mapAsSerializableJavaMap(rdd.countByValue().mapValues(jl.Long.valueOf)) /** * (Experimental) Approximate version of countByValue(). @@ -631,8 +631,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * The asynchronous version of `count`, which returns a * future for counting the number of elements in this RDD. */ - def countAsync(): JavaFutureAction[JLong] = { - new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf) + def countAsync(): JavaFutureAction[jl.Long] = { + new JavaFutureActionWrapper[Long, jl.Long](rdd.countAsync(), jl.Long.valueOf) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 502f86f..47382e4 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1580,11 +1580,11 @@ public class JavaAPISuite implements Serializable { } double relativeSD = 0.001; JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData); - List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect(); - for (Tuple2<Integer, Object> resItem : res) { - double count = (double)resItem._1(); - Long resCount = (Long)resItem._2(); - Double error = Math.abs((resCount - count) / count); + List<Tuple2<Integer, Long>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect(); + for (Tuple2<Integer, Long> resItem : res) { + double count = resItem._1(); + long resCount = resItem._2(); + double error = Math.abs((resCount - count) / count); Assert.assertTrue(error < 0.1); } @@ -1633,12 +1633,12 @@ public class JavaAPISuite implements Serializable { fractions.put(0, 0.5); fractions.put(1, 1.0); JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L); - Map<Integer, Long> wrCounts = (Map<Integer, Long>) (Object) wr.countByKey(); + Map<Integer, Long> wrCounts = wr.countByKey(); Assert.assertEquals(2, wrCounts.size()); Assert.assertTrue(wrCounts.get(0) > 0); Assert.assertTrue(wrCounts.get(1) > 0); JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L); - Map<Integer, Long> worCounts = (Map<Integer, Long>) (Object) wor.countByKey(); + Map<Integer, Long> worCounts = wor.countByKey(); Assert.assertEquals(2, worCounts.size()); Assert.assertTrue(worCounts.get(0) > 0); Assert.assertTrue(worCounts.get(1) > 0); @@ -1659,12 +1659,12 @@ public class JavaAPISuite implements Serializable { fractions.put(0, 0.5); fractions.put(1, 1.0); JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L); - Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey(); + Map<Integer, Long> wrExactCounts = wrExact.countByKey(); Assert.assertEquals(2, wrExactCounts.size()); Assert.assertTrue(wrExactCounts.get(0) == 2); Assert.assertTrue(wrExactCounts.get(1) == 4); JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L); - Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey(); + Map<Integer, Long> worExactCounts = worExact.countByKey(); Assert.assertEquals(2, worExactCounts.size()); Assert.assertTrue(worExactCounts.get(0) == 2); Assert.assertTrue(worExactCounts.get(1) == 4); http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 84acec7..733147f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.api.java -import java.lang.{Long => JLong} +import java.{lang => jl} import java.util.{List => JList} import scala.collection.JavaConverters._ @@ -50,8 +50,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def wrapRDD(in: RDD[T]): R - implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = { - in.map(new JLong(_)) + implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[jl.Long] = { + in.map(jl.Long.valueOf) } /** @@ -74,14 +74,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): JavaDStream[JLong] = dstream.count() + def count(): JavaDStream[jl.Long] = dstream.count() /** * Return a new DStream in which each RDD contains the counts of each distinct value in * each RDD of this DStream. Hash partitioning is used to generate the RDDs with * Spark's default number of partitions. */ - def countByValue(): JavaPairDStream[T, JLong] = { + def countByValue(): JavaPairDStream[T, jl.Long] = { JavaPairDStream.scalaToJavaLong(dstream.countByValue()) } @@ -91,7 +91,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * partitions. * @param numPartitions number of partitions of each RDD in the new DStream. */ - def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = { + def countByValue(numPartitions: Int): JavaPairDStream[T, jl.Long] = { JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions)) } @@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * of elements in a window over this DStream. windowDuration and slideDuration are as defined in * the window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = { + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = { dstream.countByWindow(windowDuration, slideDuration) } @@ -116,7 +116,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * DStream's batching interval */ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration) - : JavaPairDStream[T, JLong] = { + : JavaPairDStream[T, jl.Long] = { JavaPairDStream.scalaToJavaLong( dstream.countByValueAndWindow(windowDuration, slideDuration)) } @@ -133,7 +133,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * @param numPartitions number of partitions of each RDD in the new DStream. */ def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) - : JavaPairDStream[T, JLong] = { + : JavaPairDStream[T, jl.Long] = { JavaPairDStream.scalaToJavaLong( dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions)) } http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 2bf3cce..af0d84b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -17,7 +17,8 @@ package org.apache.spark.streaming.api.java -import java.lang.{Iterable => JIterable, Long => JLong} +import java.{lang => jl} +import java.lang.{Iterable => JIterable} import java.util.{List => JList} import scala.collection.JavaConverters._ @@ -847,7 +848,7 @@ object JavaPairDStream { } def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long]) - : JavaPairDStream[K, JLong] = { - DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) + : JavaPairDStream[K, jl.Long] = { + DStream.toPairDStreamFunctions(dstream.dstream).mapValues(jl.Long.valueOf) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
