Repository: spark Updated Branches: refs/heads/master 7251be0c0 -> f83fedc9f
[SPARK-25737][CORE] Remove JavaSparkContextVarargsWorkaround ## What changes were proposed in this pull request? Remove JavaSparkContextVarargsWorkaround ## How was this patch tested? Existing tests. Closes #22729 from srowen/SPARK-25737. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f83fedc9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f83fedc9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f83fedc9 Branch: refs/heads/master Commit: f83fedc9f20869ab4c62bb07bac50113d921207f Parents: 7251be0 Author: Sean Owen <[email protected]> Authored: Wed Oct 24 14:43:51 2018 -0500 Committer: Sean Owen <[email protected]> Committed: Wed Oct 24 14:43:51 2018 -0500 ---------------------------------------------------------------------- .../java/JavaSparkContextVarargsWorkaround.java | 67 -------------------- .../spark/api/java/JavaSparkContext.scala | 42 ++++++------ .../test/org/apache/spark/JavaAPISuite.java | 5 -- .../streaming/JavaKinesisWordCountASL.java | 2 +- project/MimaExcludes.scala | 7 ++ python/pyspark/context.py | 8 ++- python/pyspark/streaming/context.py | 8 ++- .../api/java/JavaStreamingContext.scala | 27 ++++---- 8 files changed, 53 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java deleted file mode 100644 index 0dd8faf..0000000 --- a/core/src/main/java/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java; - -import java.util.ArrayList; -import java.util.List; - -// See -// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html -abstract class JavaSparkContextVarargsWorkaround { - - @SafeVarargs - public final <T> JavaRDD<T> union(JavaRDD<T>... rdds) { - if (rdds.length == 0) { - throw new IllegalArgumentException("Union called on empty list"); - } - List<JavaRDD<T>> rest = new ArrayList<>(rdds.length - 1); - for (int i = 1; i < rdds.length; i++) { - rest.add(rdds[i]); - } - return union(rdds[0], rest); - } - - public JavaDoubleRDD union(JavaDoubleRDD... rdds) { - if (rdds.length == 0) { - throw new IllegalArgumentException("Union called on empty list"); - } - List<JavaDoubleRDD> rest = new ArrayList<>(rdds.length - 1); - for (int i = 1; i < rdds.length; i++) { - rest.add(rdds[i]); - } - return union(rdds[0], rest); - } - - @SafeVarargs - public final <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) { - if (rdds.length == 0) { - throw new IllegalArgumentException("Union called on empty list"); - } - List<JavaPairRDD<K, V>> rest = new ArrayList<>(rdds.length - 1); - for (int i = 1; i < rdds.length; i++) { - rest.add(rdds[i]); - } - return union(rdds[0], rest); - } - - // These methods take separate "first" and "rest" elements to avoid having the same type erasure - public abstract <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest); - public abstract JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest); - public abstract <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> - rest); -} http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index ef15f95..03f259d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -21,6 +21,7 @@ import java.io.Closeable import java.util import java.util.{Map => JMap} +import scala.annotation.varargs import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag @@ -33,7 +34,7 @@ import org.apache.spark._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream -import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} +import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD} /** * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns @@ -42,8 +43,7 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. */ -class JavaSparkContext(val sc: SparkContext) - extends JavaSparkContextVarargsWorkaround with Closeable { +class JavaSparkContext(val sc: SparkContext) extends Closeable { /** * Create a JavaSparkContext that loads settings from system properties (for instance, when @@ -506,27 +506,29 @@ class JavaSparkContext(val sc: SparkContext) new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]]) } - /** Build the union of two or more RDDs. */ - override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { - val rdds: Seq[RDD[T]] = (Seq(first) ++ rest.asScala).map(_.rdd) - implicit val ctag: ClassTag[T] = first.classTag - sc.union(rdds) + /** Build the union of JavaRDDs. */ + @varargs + def union[T](rdds: JavaRDD[T]*): JavaRDD[T] = { + require(rdds.nonEmpty, "Union called on no RDDs") + implicit val ctag: ClassTag[T] = rdds.head.classTag + sc.union(rdds.map(_.rdd)) } - /** Build the union of two or more RDDs. */ - override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) - : JavaPairRDD[K, V] = { - val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.rdd) - implicit val ctag: ClassTag[(K, V)] = first.classTag - implicit val ctagK: ClassTag[K] = first.kClassTag - implicit val ctagV: ClassTag[V] = first.vClassTag - new JavaPairRDD(sc.union(rdds)) + /** Build the union of JavaPairRDDs. */ + @varargs + def union[K, V](rdds: JavaPairRDD[K, V]*): JavaPairRDD[K, V] = { + require(rdds.nonEmpty, "Union called on no RDDs") + implicit val ctag: ClassTag[(K, V)] = rdds.head.classTag + implicit val ctagK: ClassTag[K] = rdds.head.kClassTag + implicit val ctagV: ClassTag[V] = rdds.head.vClassTag + new JavaPairRDD(sc.union(rdds.map(_.rdd))) } - /** Build the union of two or more RDDs. */ - override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { - val rdds: Seq[RDD[Double]] = (Seq(first) ++ rest.asScala).map(_.srdd) - new JavaDoubleRDD(sc.union(rdds)) + /** Build the union of JavaDoubleRDDs. */ + @varargs + def union(rdds: JavaDoubleRDD*): JavaDoubleRDD = { + require(rdds.nonEmpty, "Union called on no RDDs") + new JavaDoubleRDD(sc.union(rdds.map(_.srdd))) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/core/src/test/java/test/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 365a93d..f979f9e 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -106,11 +106,6 @@ public class JavaAPISuite implements Serializable { // Varargs JavaRDD<String> sUnion = sc.union(s1, s2); assertEquals(4, sUnion.count()); - // List - List<JavaRDD<String>> list = new ArrayList<>(); - list.add(s2); - sUnion = sc.union(s1, list); - assertEquals(4, sUnion.count()); // Union of JavaDoubleRDDs List<Double> doubles = Arrays.asList(1.0, 2.0); http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java ---------------------------------------------------------------------- diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index 626bde4..86c42df 100644 --- a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -145,7 +145,7 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr // Union all the streams if there is more than 1 stream JavaDStream<byte[]> unionStreams; if (streamsList.size() > 1) { - unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); + unionStreams = jssc.union(streamsList.toArray(new JavaDStream[0])); } else { // Otherwise, just use the 1 stream unionStreams = streamsList.get(0); http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index d6beac1..350d8ad 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,10 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + // [SPARK-25737] Remove JavaSparkContextVarargsWorkaround + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.api.java.JavaSparkContext"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.union"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.union"), // [SPARK-16775] Remove deprecated accumulator v1 APIs ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Accumulable"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.AccumulatorParam"), @@ -55,9 +59,12 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.accumulable"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.doubleAccumulator"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.api.java.JavaSparkContext.accumulator"), + // [SPARK-24109] Remove class SnappyOutputStreamWrapper ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.SnappyCompressionCodec.version"), + // [SPARK-19287] JavaPairRDD flatMapValues requires function returning Iterable, not Iterator ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.api.java.JavaPairRDD.flatMapValues"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.api.java.JavaPairDStream.flatMapValues"), + // [SPARK-25680] SQL execution listener shouldn't happen on execution thread ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.util.ExecutionListenerManager.clone"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.ExecutionListenerManager.this") ) http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/python/pyspark/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 0924d3d..1180bf9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -834,9 +834,11 @@ class SparkContext(object): first_jrdd_deserializer = rdds[0]._jrdd_deserializer if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds): rdds = [x._reserialize() for x in rdds] - first = rdds[0]._jrdd - rest = [x._jrdd for x in rdds[1:]] - return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer) + cls = SparkContext._jvm.org.apache.spark.api.java.JavaRDD + jrdds = SparkContext._gateway.new_array(cls, len(rdds)) + for i in range(0, len(rdds)): + jrdds[i] = rdds[i]._jrdd + return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer) def broadcast(self, value): """ http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/python/pyspark/streaming/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 3fa57ca..e1c194b 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -343,9 +343,11 @@ class StreamingContext(object): raise ValueError("All DStreams should have same serializer") if len(set(s._slideDuration for s in dstreams)) > 1: raise ValueError("All DStreams should have same slide duration") - first = dstreams[0] - jrest = [d._jdstream for d in dstreams[1:]] - return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) + cls = SparkContext._jvm.org.apache.spark.streaming.api.java.JavaDStream + jdstreams = SparkContext._gateway.new_array(cls, len(dstreams)) + for i in range(0, len(dstreams)): + jdstreams[i] = dstreams[i]._jdstream + return DStream(self._jssc.union(jdstreams), self, dstreams[0]._jrdd_deserializer) def addStreamingListener(self, streamingListener): """ http://git-wip-us.apache.org/repos/asf/spark/blob/f83fedc9/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 982e72c..e61c0d4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -21,6 +21,7 @@ import java.io.{Closeable, InputStream} import java.lang.{Boolean => JBoolean} import java.util.{List => JList, Map => JMap} +import scala.annotation.varargs import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -36,7 +37,6 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.StreamingListener @@ -431,24 +431,23 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ - def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = { - val dstreams: Seq[DStream[T]] = (Seq(first) ++ rest.asScala).map(_.dstream) - implicit val cm: ClassTag[T] = first.classTag - ssc.union(dstreams)(cm) + @varargs + def union[T](jdstreams: JavaDStream[T]*): JavaDStream[T] = { + require(jdstreams.nonEmpty, "Union called on no streams") + implicit val cm: ClassTag[T] = jdstreams.head.classTag + ssc.union(jdstreams.map(_.dstream))(cm) } /** * Create a unified DStream from multiple DStreams of the same type and same slide duration. */ - def union[K, V]( - first: JavaPairDStream[K, V], - rest: JList[JavaPairDStream[K, V]] - ): JavaPairDStream[K, V] = { - val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.dstream) - implicit val cm: ClassTag[(K, V)] = first.classTag - implicit val kcm: ClassTag[K] = first.kManifest - implicit val vcm: ClassTag[V] = first.vManifest - new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) + @varargs + def union[K, V](jdstreams: JavaPairDStream[K, V]*): JavaPairDStream[K, V] = { + require(jdstreams.nonEmpty, "Union called on no streams") + implicit val cm: ClassTag[(K, V)] = jdstreams.head.classTag + implicit val kcm: ClassTag[K] = jdstreams.head.kManifest + implicit val vcm: ClassTag[V] = jdstreams.head.vManifest + new JavaPairDStream[K, V](ssc.union(jdstreams.map(_.dstream))(cm))(kcm, vcm) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
