Repository: spark
Updated Branches:
  refs/heads/master 794ea553b -> b9c835337


[SPARK-12618][CORE][STREAMING][SQL] Clean up build warnings: 2.0.0 edition

Fix most build warnings: mostly deprecated API usages. I'll annotate some of 
the changes below. CC rxin who is leading the charge to remove the deprecated 
APIs.

Author: Sean Owen <[email protected]>

Closes #10570 from srowen/SPARK-12618.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9c83533
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9c83533
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9c83533

Branch: refs/heads/master
Commit: b9c835337880f57fe8b953962913bcc524162348
Parents: 794ea55
Author: Sean Owen <[email protected]>
Authored: Fri Jan 8 17:47:44 2016 +0000
Committer: Sean Owen <[email protected]>
Committed: Fri Jan 8 17:47:44 2016 +0000

----------------------------------------------------------------------
 .../test/scala/org/apache/spark/Smuggle.scala   |  1 +
 .../JavaBinaryClassificationMetricsExample.java |  5 +-
 .../mllib/JavaRankingMetricsExample.java        | 21 +++++--
 .../JavaRecoverableNetworkWordCount.java        |  8 +--
 .../streaming/JavaSqlNetworkWordCount.java      |  8 +--
 .../JavaTwitterHashTagJoinSentiments.java       | 36 +++++------
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 .../spark/examples/SparkTachyonHdfsLR.scala     |  2 +-
 .../kafka/JavaDirectKafkaStreamSuite.java       |  7 +--
 .../streaming/kafka/JavaKafkaStreamSuite.java   |  8 +--
 .../streaming/kinesis/KinesisStreamSuite.scala  |  8 +--
 .../apache/spark/mllib/clustering/KMeans.scala  |  8 +--
 .../mllib/recommendation/JavaALSSuite.java      |  4 +-
 .../regression/JavaIsotonicRegressionSuite.java | 18 +++---
 python/pyspark/mllib/clustering.py              |  2 +-
 .../expressions/ExpressionEvalHelper.scala      |  8 +--
 .../sql/catalyst/util/DateTimeUtilsSuite.scala  |  3 -
 .../SpecificParquetRecordReaderBase.java        | 19 +++---
 .../spark/sql/ColumnExpressionSuite.scala       |  4 +-
 .../scala/org/apache/spark/sql/QueryTest.scala  |  5 +-
 .../execution/columnar/ColumnarTestUtils.scala  |  1 +
 .../apache/spark/streaming/JavaAPISuite.java    |  4 +-
 .../spark/streaming/JavaMapWithStateSuite.java  | 64 ++++++++------------
 .../spark/streaming/JavaReceiverAPISuite.java   | 14 ++---
 24 files changed, 123 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/core/src/test/scala/org/apache/spark/Smuggle.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/Smuggle.scala 
b/core/src/test/scala/org/apache/spark/Smuggle.scala
index 01694a6..9f0a1b4 100644
--- a/core/src/test/scala/org/apache/spark/Smuggle.scala
+++ b/core/src/test/scala/org/apache/spark/Smuggle.scala
@@ -21,6 +21,7 @@ import java.util.UUID
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.mutable
+import scala.language.implicitConversions
 
 /**
   * Utility wrapper to "smuggle" objects into tasks while bypassing 
serialization.

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
index 779fac0..3d8babb 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java
@@ -56,6 +56,7 @@ public class JavaBinaryClassificationMetricsExample {
     // Compute raw scores on the test set.
     JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
       new Function<LabeledPoint, Tuple2<Object, Object>>() {
+        @Override
         public Tuple2<Object, Object> call(LabeledPoint p) {
           Double prediction = model.predict(p.features());
           return new Tuple2<Object, Object>(prediction, p.label());
@@ -88,6 +89,7 @@ public class JavaBinaryClassificationMetricsExample {
     // Thresholds
     JavaRDD<Double> thresholds = precision.map(
       new Function<Tuple2<Object, Object>, Double>() {
+        @Override
         public Double call(Tuple2<Object, Object> t) {
           return new Double(t._1().toString());
         }
@@ -106,8 +108,7 @@ public class JavaBinaryClassificationMetricsExample {
 
     // Save and load model
     model.save(sc, "target/tmp/LogisticRegressionModel");
-    LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
-      "target/tmp/LogisticRegressionModel");
+    LogisticRegressionModel.load(sc, "target/tmp/LogisticRegressionModel");
     // $example off$
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
index 47ab3fc..4ad2104 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
@@ -41,6 +41,7 @@ public class JavaRankingMetricsExample {
     JavaRDD<String> data = sc.textFile(path);
     JavaRDD<Rating> ratings = data.map(
       new Function<String, Rating>() {
+        @Override
         public Rating call(String line) {
           String[] parts = line.split("::");
             return new Rating(Integer.parseInt(parts[0]), 
Integer.parseInt(parts[1]), Double
@@ -57,13 +58,14 @@ public class JavaRankingMetricsExample {
     JavaRDD<Tuple2<Object, Rating[]>> userRecs = 
model.recommendProductsForUsers(10).toJavaRDD();
     JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(
       new Function<Tuple2<Object, Rating[]>, Tuple2<Object, Rating[]>>() {
+        @Override
         public Tuple2<Object, Rating[]> call(Tuple2<Object, Rating[]> t) {
           Rating[] scaledRatings = new Rating[t._2().length];
           for (int i = 0; i < scaledRatings.length; i++) {
             double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 
0.0);
             scaledRatings[i] = new Rating(t._2()[i].user(), 
t._2()[i].product(), newRating);
           }
-          return new Tuple2<Object, Rating[]>(t._1(), scaledRatings);
+          return new Tuple2<>(t._1(), scaledRatings);
         }
       }
     );
@@ -72,6 +74,7 @@ public class JavaRankingMetricsExample {
     // Map ratings to 1 or 0, 1 indicating a movie that should be recommended
     JavaRDD<Rating> binarizedRatings = ratings.map(
       new Function<Rating, Rating>() {
+        @Override
         public Rating call(Rating r) {
           double binaryRating;
           if (r.rating() > 0.0) {
@@ -87,6 +90,7 @@ public class JavaRankingMetricsExample {
     // Group ratings by common user
     JavaPairRDD<Object, Iterable<Rating>> userMovies = 
binarizedRatings.groupBy(
       new Function<Rating, Object>() {
+        @Override
         public Object call(Rating r) {
           return r.user();
         }
@@ -96,8 +100,9 @@ public class JavaRankingMetricsExample {
     // Get true relevant documents from all user ratings
     JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(
       new Function<Iterable<Rating>, List<Integer>>() {
+        @Override
         public List<Integer> call(Iterable<Rating> docs) {
-          List<Integer> products = new ArrayList<Integer>();
+          List<Integer> products = new ArrayList<>();
           for (Rating r : docs) {
             if (r.rating() > 0.0) {
               products.add(r.product());
@@ -111,8 +116,9 @@ public class JavaRankingMetricsExample {
     // Extract the product id from each recommendation
     JavaPairRDD<Object, List<Integer>> userRecommendedList = 
userRecommended.mapValues(
       new Function<Rating[], List<Integer>>() {
+        @Override
         public List<Integer> call(Rating[] docs) {
-          List<Integer> products = new ArrayList<Integer>();
+          List<Integer> products = new ArrayList<>();
           for (Rating r : docs) {
             products.add(r.product());
           }
@@ -124,7 +130,7 @@ public class JavaRankingMetricsExample {
       userRecommendedList).values();
 
     // Instantiate the metrics object
-    RankingMetrics metrics = RankingMetrics.of(relevantDocs);
+    RankingMetrics<Integer> metrics = RankingMetrics.of(relevantDocs);
 
     // Precision and NDCG at k
     Integer[] kVector = {1, 3, 5};
@@ -139,6 +145,7 @@ public class JavaRankingMetricsExample {
     // Evaluate the model using numerical ratings and regression metrics
     JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
       new Function<Rating, Tuple2<Object, Object>>() {
+        @Override
         public Tuple2<Object, Object> call(Rating r) {
           return new Tuple2<Object, Object>(r.user(), r.product());
         }
@@ -147,18 +154,20 @@ public class JavaRankingMetricsExample {
     JavaPairRDD<Tuple2<Integer, Integer>, Object> predictions = 
JavaPairRDD.fromJavaRDD(
       model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
         new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
+          @Override
           public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
             return new Tuple2<Tuple2<Integer, Integer>, Object>(
-              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
+              new Tuple2<>(r.user(), r.product()), r.rating());
           }
         }
       ));
     JavaRDD<Tuple2<Object, Object>> ratesAndPreds =
       JavaPairRDD.fromJavaRDD(ratings.map(
         new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
+          @Override
           public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
             return new Tuple2<Tuple2<Integer, Integer>, Object>(
-              new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
+              new Tuple2<>(r.user(), r.product()), r.rating());
           }
         }
       )).join(predictions).values();

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index 90d4737..bc963a0 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -36,6 +36,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.Time;
@@ -154,9 +155,9 @@ public final class JavaRecoverableNetworkWordCount {
         }
       });
 
-    wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, 
Void>() {
+    wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, 
Time>() {
       @Override
-      public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws 
IOException {
+      public void call(JavaPairRDD<String, Integer> rdd, Time time) throws 
IOException {
         // Get or register the blacklist Broadcast
         final Broadcast<List<String>> blacklist = 
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
         // Get or register the droppedWordsCounter Accumulator
@@ -164,7 +165,7 @@ public final class JavaRecoverableNetworkWordCount {
         // Use blacklist to drop words and use droppedWordsCounter to count 
them
         String counts = rdd.filter(new Function<Tuple2<String, Integer>, 
Boolean>() {
           @Override
-          public Boolean call(Tuple2<String, Integer> wordCount) throws 
Exception {
+          public Boolean call(Tuple2<String, Integer> wordCount) {
             if (blacklist.value().contains(wordCount._1())) {
               droppedWordsCounter.add(wordCount._2());
               return false;
@@ -178,7 +179,6 @@ public final class JavaRecoverableNetworkWordCount {
         System.out.println("Dropped " + droppedWordsCounter.value() + " 
word(s) totally");
         System.out.println("Appending to " + outputFile.getAbsolutePath());
         Files.append(output + "\n", outputFile, Charset.defaultCharset());
-        return null;
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 3515d7b..084f68a 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.api.java.StorageLevels;
@@ -78,13 +78,14 @@ public final class JavaSqlNetworkWordCount {
     });
 
     // Convert RDDs of the words DStream to DataFrame and run SQL query
-    words.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
+    words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
       @Override
-      public Void call(JavaRDD<String> rdd, Time time) {
+      public void call(JavaRDD<String> rdd, Time time) {
         SQLContext sqlContext = 
JavaSQLContextSingleton.getInstance(rdd.context());
 
         // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
         JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, 
JavaRecord>() {
+          @Override
           public JavaRecord call(String word) {
             JavaRecord record = new JavaRecord();
             record.setWord(word);
@@ -101,7 +102,6 @@ public final class JavaSqlNetworkWordCount {
             sqlContext.sql("select word, count(*) as total from words group by 
word");
         System.out.println("========= " + time + "=========");
         wordCountsDataFrame.show();
-        return null;
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
index 030ee30..d869768 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
@@ -17,13 +17,13 @@
 
 package org.apache.spark.examples.streaming;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -33,8 +33,6 @@ import org.apache.spark.streaming.twitter.TwitterUtils;
 import scala.Tuple2;
 import twitter4j.Status;
 
-import java.io.IOException;
-import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
 
@@ -44,7 +42,7 @@ import java.util.List;
  */
 public class JavaTwitterHashTagJoinSentiments {
 
-  public static void main(String[] args) throws IOException {
+  public static void main(String[] args) {
     if (args.length < 4) {
       System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer 
key> <consumer secret>" +
         " <access token> <access token secret> [<filters>]");
@@ -79,7 +77,7 @@ public class JavaTwitterHashTagJoinSentiments {
 
     JavaDStream<String> hashTags = words.filter(new Function<String, 
Boolean>() {
       @Override
-      public Boolean call(String word) throws Exception {
+      public Boolean call(String word) {
         return word.startsWith("#");
       }
     });
@@ -91,8 +89,7 @@ public class JavaTwitterHashTagJoinSentiments {
         @Override
         public Tuple2<String, Double> call(String line) {
           String[] columns = line.split("\t");
-          return new Tuple2<String, Double>(columns[0],
-            Double.parseDouble(columns[1]));
+          return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
         }
       });
 
@@ -101,7 +98,7 @@ public class JavaTwitterHashTagJoinSentiments {
         @Override
         public Tuple2<String, Integer> call(String s) {
           // leave out the # character
-          return new Tuple2<String, Integer>(s.substring(1), 1);
+          return new Tuple2<>(s.substring(1), 1);
         }
       });
 
@@ -120,9 +117,8 @@ public class JavaTwitterHashTagJoinSentiments {
       hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
         JavaPairRDD<String, Tuple2<Double, Integer>>>() {
         @Override
-        public JavaPairRDD<String, Tuple2<Double, Integer>> 
call(JavaPairRDD<String,
-          Integer> topicCount)
-          throws Exception {
+        public JavaPairRDD<String, Tuple2<Double, Integer>> call(
+            JavaPairRDD<String, Integer> topicCount) {
           return wordSentiments.join(topicCount);
         }
       });
@@ -131,9 +127,9 @@ public class JavaTwitterHashTagJoinSentiments {
       new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, 
Double>() {
         @Override
         public Tuple2<String, Double> call(Tuple2<String,
-          Tuple2<Double, Integer>> topicAndTuplePair) throws Exception {
+          Tuple2<Double, Integer>> topicAndTuplePair) {
           Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
-          return new Tuple2<String, Double>(topicAndTuplePair._1(),
+          return new Tuple2<>(topicAndTuplePair._1(),
             happinessAndCount._1() * happinessAndCount._2());
         }
       });
@@ -141,9 +137,8 @@ public class JavaTwitterHashTagJoinSentiments {
     JavaPairDStream<Double, String> happinessTopicPairs = 
topicHappiness.mapToPair(
       new PairFunction<Tuple2<String, Double>, Double, String>() {
         @Override
-        public Tuple2<Double, String> call(Tuple2<String, Double> 
topicHappiness)
-          throws Exception {
-          return new Tuple2<Double, String>(topicHappiness._2(),
+        public Tuple2<Double, String> call(Tuple2<String, Double> 
topicHappiness) {
+          return new Tuple2<>(topicHappiness._2(),
             topicHappiness._1());
         }
       });
@@ -151,17 +146,17 @@ public class JavaTwitterHashTagJoinSentiments {
     JavaPairDStream<Double, String> happiest10 = 
happinessTopicPairs.transformToPair(
       new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() 
{
         @Override
-        public JavaPairRDD<Double, String> call(JavaPairRDD<Double,
-          String> happinessAndTopics) throws Exception {
+        public JavaPairRDD<Double, String> call(
+            JavaPairRDD<Double, String> happinessAndTopics) {
           return happinessAndTopics.sortByKey(false);
         }
       }
     );
 
     // Print hash tags with the most positive sentiment values
-    happiest10.foreachRDD(new Function<JavaPairRDD<Double, String>, Void>() {
+    happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
       @Override
-      public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws 
Exception {
+      public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
         List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
         System.out.println(
           String.format("\nHappiest topics in last 10 seconds (%s total):",
@@ -170,7 +165,6 @@ public class JavaTwitterHashTagJoinSentiments {
           System.out.println(
             String.format("%s (%s happiness)", pair._2(), pair._1()));
         }
-        return null;
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index 04dec57..e4486b9 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -74,7 +74,7 @@ object SparkHdfsLR {
     val conf = new Configuration()
     val sc = new SparkContext(sparkConf)
     val lines = sc.textFile(inputPath)
-    val points = lines.map(parsePoint _).cache()
+    val points = lines.map(parsePoint).cache()
     val ITERATIONS = args(1).toInt
 
     // Initialize w to a random value

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala 
b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
index ddc99d3..8b739c9 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala
@@ -71,7 +71,7 @@ object SparkTachyonHdfsLR {
     val conf = new Configuration()
     val sc = new SparkContext(sparkConf)
     val lines = sc.textFile(inputPath)
-    val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP)
+    val points = lines.map(parsePoint).persist(StorageLevel.OFF_HEAP)
     val ITERATIONS = args(1).toInt
 
     // Initialize w to a random value

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index fbdfbf7..4891e4f 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -35,6 +35,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -130,17 +131,15 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
     JavaDStream<String> unifiedStream = stream1.union(stream2);
 
     final Set<String> result = Collections.synchronizedSet(new 
HashSet<String>());
-    unifiedStream.foreachRDD(
-        new Function<JavaRDD<String>, Void>() {
+    unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
           @Override
-          public Void call(JavaRDD<String> rdd) {
+          public void call(JavaRDD<String> rdd) {
             result.addAll(rdd.collect());
             for (OffsetRange o : offsetRanges.get()) {
               System.out.println(
                 o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + 
o.untilOffset()
               );
             }
-            return null;
           }
         }
     );

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 1e69de4..617c92a 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -103,10 +104,9 @@ public class JavaKafkaStreamSuite implements Serializable {
       }
     );
 
-    words.countByValue().foreachRDD(
-      new Function<JavaPairRDD<String, Long>, Void>() {
+    words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, 
Long>>() {
         @Override
-        public Void call(JavaPairRDD<String, Long> rdd) {
+        public void call(JavaPairRDD<String, Long> rdd) {
           List<Tuple2<String, Long>> ret = rdd.collect();
           for (Tuple2<String, Long> r : ret) {
             if (result.containsKey(r._1())) {
@@ -115,8 +115,6 @@ public class JavaKafkaStreamSuite implements Serializable {
               result.put(r._1(), r._2());
             }
           }
-
-          return null;
         }
       }
     );

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 6fe24fe..78263f9 100644
--- 
a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ 
b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -137,8 +137,8 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
     // Verify that the generated KinesisBackedBlockRDD has the all the right 
information
     val blockInfos = Seq(blockInfo1, blockInfo2)
     val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
-    nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
-    val kinesisRDD = 
nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
+    nonEmptyRDD shouldBe a [KinesisBackedBlockRDD[_]]
+    val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD[_]]
     assert(kinesisRDD.regionName === dummyRegionName)
     assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
     assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
@@ -203,7 +203,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
       Seconds(10), StorageLevel.MEMORY_ONLY, addFive,
       awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
 
-    stream shouldBe a [ReceiverInputDStream[Int]]
+    stream shouldBe a [ReceiverInputDStream[_]]
 
     val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
     stream.foreachRDD { rdd =>
@@ -272,7 +272,7 @@ abstract class KinesisStreamTests(aggregateTestData: 
Boolean) extends KinesisFun
     times.foreach { time =>
       val (arrayOfSeqNumRanges, data) = collectedData(time)
       val rdd = 
recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
-      rdd shouldBe a [KinesisBackedBlockRDD[Array[Byte]]]
+      rdd shouldBe a [KinesisBackedBlockRDD[_]]
 
       // Verify the recovered sequence ranges
       val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index e47c4db..ca11ede 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Logging
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.annotation.Since
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.linalg.BLAS.{axpy, scal}
 import org.apache.spark.mllib.util.MLUtils
@@ -107,7 +107,7 @@ class KMeans private (
    * Number of runs of the algorithm to execute in parallel.
    */
   @Since("1.4.0")
-  @deprecated("Support for runs is deprecated. This param will have no effect 
in 1.7.0.", "1.6.0")
+  @deprecated("Support for runs is deprecated. This param will have no effect 
in 2.0.0.", "1.6.0")
   def getRuns: Int = runs
 
   /**
@@ -117,7 +117,7 @@ class KMeans private (
    * return the best clustering found over any run. Default: 1.
    */
   @Since("0.8.0")
-  @deprecated("Support for runs is deprecated. This param will have no effect 
in 1.7.0.", "1.6.0")
+  @deprecated("Support for runs is deprecated. This param will have no effect 
in 2.0.0.", "1.6.0")
   def setRuns(runs: Int): this.type = {
     if (runs <= 0) {
       throw new IllegalArgumentException("Number of runs must be positive")
@@ -431,7 +431,7 @@ class KMeans private (
           val rs = (0 until runs).filter { r =>
             rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
           }
-          if (rs.length > 0) Some(p, rs) else None
+          if (rs.length > 0) Some((p, rs)) else None
         }
       }.collect()
       mergeNewCenters()

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java 
b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
index 271dda4..a6631ed 100644
--- 
a/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/mllib/recommendation/JavaALSSuite.java
@@ -56,10 +56,10 @@ public class JavaALSSuite implements Serializable {
       double matchThreshold,
       boolean implicitPrefs,
       DoubleMatrix truePrefs) {
-    List<Tuple2<Integer, Integer>> localUsersProducts = new ArrayList(users * 
products);
+    List<Tuple2<Integer, Integer>> localUsersProducts = new ArrayList<>(users 
* products);
     for (int u=0; u < users; ++u) {
       for (int p=0; p < products; ++p) {
-        localUsersProducts.add(new Tuple2<Integer, Integer>(u, p));
+        localUsersProducts.add(new Tuple2<>(u, p));
       }
     }
     JavaPairRDD<Integer, Integer> usersProducts = 
sc.parallelizePairs(localUsersProducts);

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
 
b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
index 32c2f4f..3db9b39 100644
--- 
a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
+++ 
b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaIsotonicRegressionSuite.java
@@ -36,11 +36,11 @@ import org.apache.spark.api.java.JavaSparkContext;
 public class JavaIsotonicRegressionSuite implements Serializable {
   private transient JavaSparkContext sc;
 
-  private List<Tuple3<Double, Double, Double>> generateIsotonicInput(double[] 
labels) {
-    ArrayList<Tuple3<Double, Double, Double>> input = new 
ArrayList(labels.length);
+  private static List<Tuple3<Double, Double, Double>> 
generateIsotonicInput(double[] labels) {
+    List<Tuple3<Double, Double, Double>> input = new 
ArrayList<>(labels.length);
 
     for (int i = 1; i <= labels.length; i++) {
-      input.add(new Tuple3<Double, Double, Double>(labels[i-1], (double) i, 
1d));
+      input.add(new Tuple3<>(labels[i-1], (double) i, 1.0));
     }
 
     return input;
@@ -70,7 +70,7 @@ public class JavaIsotonicRegressionSuite implements 
Serializable {
       runIsotonicRegression(new double[]{1, 2, 3, 3, 1, 6, 7, 8, 11, 9, 10, 
12});
 
     Assert.assertArrayEquals(
-      new double[] {1, 2, 7d/3, 7d/3, 6, 7, 8, 10, 10, 12}, 
model.predictions(), 1e-14);
+      new double[] {1, 2, 7.0/3, 7.0/3, 6, 7, 8, 10, 10, 12}, 
model.predictions(), 1.0e-14);
   }
 
   @Test
@@ -81,10 +81,10 @@ public class JavaIsotonicRegressionSuite implements 
Serializable {
     JavaDoubleRDD testRDD = sc.parallelizeDoubles(Arrays.asList(0.0, 1.0, 9.5, 
12.0, 13.0));
     List<Double> predictions = model.predict(testRDD).collect();
 
-    Assert.assertTrue(predictions.get(0) == 1d);
-    Assert.assertTrue(predictions.get(1) == 1d);
-    Assert.assertTrue(predictions.get(2) == 10d);
-    Assert.assertTrue(predictions.get(3) == 12d);
-    Assert.assertTrue(predictions.get(4) == 12d);
+    Assert.assertEquals(1.0, predictions.get(0).doubleValue(), 1.0e-14);
+    Assert.assertEquals(1.0, predictions.get(1).doubleValue(), 1.0e-14);
+    Assert.assertEquals(10.0, predictions.get(2).doubleValue(), 1.0e-14);
+    Assert.assertEquals(12.0, predictions.get(3).doubleValue(), 1.0e-14);
+    Assert.assertEquals(12.0, predictions.get(4).doubleValue(), 1.0e-14);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py 
b/python/pyspark/mllib/clustering.py
index 48daa87..d22a7f4 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -173,7 +173,7 @@ class KMeans(object):
         """Train a k-means clustering model."""
         if runs != 1:
             warnings.warn(
-                "Support for runs is deprecated in 1.6.0. This param will have 
no effect in 1.7.0.")
+                "Support for runs is deprecated in 1.6.0. This param will have 
no effect in 2.0.0.")
         clusterInitialModel = []
         if initialModel is not None:
             if not isinstance(initialModel, KMeansModel):

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index f869a96..e028d22 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -57,8 +57,8 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
     (result, expected) match {
       case (result: Array[Byte], expected: Array[Byte]) =>
         java.util.Arrays.equals(result, expected)
-      case (result: Double, expected: Spread[Double]) =>
-        expected.isWithin(result)
+      case (result: Double, expected: Spread[Double @unchecked]) =>
+        expected.asInstanceOf[Spread[Double]].isWithin(result)
       case _ => result == expected
     }
   }
@@ -275,8 +275,8 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
     (result, expected) match {
       case (result: Array[Byte], expected: Array[Byte]) =>
         java.util.Arrays.equals(result, expected)
-      case (result: Double, expected: Spread[Double]) =>
-        expected.isWithin(result)
+      case (result: Double, expected: Spread[Double @unchecked]) =>
+        expected.asInstanceOf[Spread[Double]].isWithin(result)
       case (result: Double, expected: Double) if result.isNaN && 
expected.isNaN =>
         true
       case (result: Float, expected: Float) if result.isNaN && expected.isNaN 
=>

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index d5f1c4d..6745b4b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -384,9 +384,6 @@ class DateTimeUtilsSuite extends SparkFunSuite {
       Timestamp.valueOf("1700-02-28 12:14:50.123456")).foreach { t =>
       val us = fromJavaTimestamp(t)
       assert(toJavaTimestamp(us) === t)
-      assert(getHours(us) === t.getHours)
-      assert(getMinutes(us) === t.getMinutes)
-      assert(getSeconds(us) === t.getSeconds)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index f8e32d6..6bcd155 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.parquet;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -62,7 +63,7 @@ import org.apache.parquet.schema.Types;
 import org.apache.spark.sql.types.StructType;
 
 /**
- * Base class for custom RecordReaaders for Parquet that directly materialize 
to `T`.
+ * Base class for custom RecordReaders for Parquet that directly materialize 
to `T`.
  * This class handles computing row groups, filtering on them, setting up the 
column readers,
  * etc.
  * This is heavily based on parquet-mr's RecordReader.
@@ -83,6 +84,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
 
   protected ParquetFileReader reader;
 
+  @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
       throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
@@ -131,8 +133,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     }
     this.fileSchema = footer.getFileMetaData().getSchema();
     Map<String, String> fileMetadata = 
footer.getFileMetaData().getKeyValueMetaData();
-    ReadSupport<T> readSupport = getReadSupportInstance(
-        (Class<? extends ReadSupport<T>>) getReadSupportClass(configuration));
+    ReadSupport<T> readSupport = 
getReadSupportInstance(getReadSupportClass(configuration));
     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
         taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), 
fileSchema));
     this.requestedSchema = readContext.getRequestedSchema();
@@ -282,8 +283,9 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
     return Collections.unmodifiableMap(setMultiMap);
   }
 
-  private static Class<?> getReadSupportClass(Configuration configuration) {
-    return ConfigurationUtil.getClassFromConfig(configuration,
+  @SuppressWarnings("unchecked")
+  private Class<? extends ReadSupport<T>> getReadSupportClass(Configuration 
configuration) {
+    return (Class<? extends ReadSupport<T>>) 
ConfigurationUtil.getClassFromConfig(configuration,
         ParquetInputFormat.READ_SUPPORT_CLASS, ReadSupport.class);
   }
 
@@ -294,10 +296,9 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
   private static <T> ReadSupport<T> getReadSupportInstance(
       Class<? extends ReadSupport<T>> readSupportClass){
     try {
-      return readSupportClass.newInstance();
-    } catch (InstantiationException e) {
-      throw new BadConfigurationException("could not instantiate read support 
class", e);
-    } catch (IllegalAccessException e) {
+      return readSupportClass.getConstructor().newInstance();
+    } catch (InstantiationException | IllegalAccessException |
+             NoSuchMethodException | InvocationTargetException e) {
       throw new BadConfigurationException("could not instantiate read support 
class", e);
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 076db0c..eb4efcd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -580,7 +580,7 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
     )
   }
 
-  test("sparkPartitionId") {
+  test("spark_partition_id") {
     // Make sure we have 2 partitions, each with 2 records.
     val df = sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
       Iterator(Tuple1(1), Tuple1(2))
@@ -591,7 +591,7 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
     )
   }
 
-  test("InputFileName") {
+  test("input_file_name") {
     withTempPath { dir =>
       val data = sparkContext.parallelize(0 to 10).toDF("id")
       data.write.parquet(dir.getCanonicalPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 0e60573..fac26bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import java.util.{Locale, TimeZone}
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
@@ -206,7 +207,7 @@ abstract class QueryTest extends PlanTest {
     val jsonString = try {
       logicalPlan.toJSON
     } catch {
-      case e =>
+      case NonFatal(e) =>
         fail(
           s"""
              |Failed to parse logical plan to JSON:
@@ -231,7 +232,7 @@ abstract class QueryTest extends PlanTest {
     val jsonBackPlan = try {
       TreeNode.fromJSON[LogicalPlan](jsonString, sqlContext.sparkContext)
     } catch {
-      case e =>
+      case NonFatal(e) =>
         fail(
           s"""
              |Failed to rebuild the logical plan from JSON:

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
index 97cba1e..1529313 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala
@@ -60,6 +60,7 @@ object ColumnarTestUtils {
       case MAP(_) =>
         ArrayBasedMapData(
           Map(Random.nextInt() -> 
UTF8String.fromString(Random.nextString(Random.nextInt(32)))))
+      case _ => throw new IllegalArgumentException(s"Unknown column type 
$columnType")
     }).asInstanceOf[JvmType]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 9722c60..ddc56fc 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -772,8 +772,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext 
implements Serializa
   @SuppressWarnings("unchecked")
   @Test
   public void testForeachRDD() {
-    final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
-    final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+    final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
+    final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
     List<List<Integer>> inputData = Arrays.asList(
         Arrays.asList(1,1,1),
         Arrays.asList(1,1,1));

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index bc4bc2e..20e2a1c 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -26,10 +27,10 @@ import java.util.Set;
 import scala.Tuple2;
 
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.util.ManualClock;
 import org.junit.Assert;
@@ -51,10 +52,8 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
     JavaPairRDD<String, Boolean> initialRDD = null;
     JavaPairDStream<String, Integer> wordsDstream = null;
 
-    final Function4<Time, String, Optional<Integer>, State<Boolean>, 
Optional<Double>>
-        mappingFunc =
+    Function4<Time, String, Optional<Integer>, State<Boolean>, 
Optional<Double>> mappingFunc =
         new Function4<Time, String, Optional<Integer>, State<Boolean>, 
Optional<Double>>() {
-
           @Override
           public Optional<Double> call(
               Time time, String word, Optional<Integer> one, State<Boolean> 
state) {
@@ -76,11 +75,10 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
                 .partitioner(new HashPartitioner(10))
                 .timeout(Durations.seconds(10)));
 
-    JavaPairDStream<String, Boolean> stateSnapshots = 
stateDstream.stateSnapshots();
+    stateDstream.stateSnapshots();
 
-    final Function3<String, Optional<Integer>, State<Boolean>, Double> 
mappingFunc2 =
+    Function3<String, Optional<Integer>, State<Boolean>, Double> mappingFunc2 =
         new Function3<String, Optional<Integer>, State<Boolean>, Double>() {
-
           @Override
           public Double call(String key, Optional<Integer> one, State<Boolean> 
state) {
             // Use all State's methods here
@@ -95,13 +93,13 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
 
     JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
         wordsDstream.mapWithState(
-            StateSpec.<String, Integer, Boolean, Double>function(mappingFunc2)
+            StateSpec.function(mappingFunc2)
                 .initialState(initialRDD)
                 .numPartitions(10)
                 .partitioner(new HashPartitioner(10))
                 .timeout(Durations.seconds(10)));
 
-    JavaPairDStream<String, Boolean> stateSnapshots2 = 
stateDstream2.stateSnapshots();
+    stateDstream2.stateSnapshots();
   }
 
   @Test
@@ -126,33 +124,21 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
         Collections.<Integer>emptySet()
     );
 
+    @SuppressWarnings("unchecked")
     List<Set<Tuple2<String, Integer>>> stateData = Arrays.asList(
         Collections.<Tuple2<String, Integer>>emptySet(),
-        Sets.newHashSet(new Tuple2<String, Integer>("a", 1)),
-        Sets.newHashSet(new Tuple2<String, Integer>("a", 2), new 
Tuple2<String, Integer>("b", 1)),
-        Sets.newHashSet(
-            new Tuple2<String, Integer>("a", 3),
-            new Tuple2<String, Integer>("b", 2),
-            new Tuple2<String, Integer>("c", 1)),
-        Sets.newHashSet(
-            new Tuple2<String, Integer>("a", 4),
-            new Tuple2<String, Integer>("b", 3),
-            new Tuple2<String, Integer>("c", 1)),
-        Sets.newHashSet(
-            new Tuple2<String, Integer>("a", 5),
-            new Tuple2<String, Integer>("b", 3),
-            new Tuple2<String, Integer>("c", 1)),
-        Sets.newHashSet(
-            new Tuple2<String, Integer>("a", 5),
-            new Tuple2<String, Integer>("b", 3),
-            new Tuple2<String, Integer>("c", 1))
+        Sets.newHashSet(new Tuple2<>("a", 1)),
+        Sets.newHashSet(new Tuple2<>("a", 2), new Tuple2<>("b", 1)),
+        Sets.newHashSet(new Tuple2<>("a", 3), new Tuple2<>("b", 2), new 
Tuple2<>("c", 1)),
+        Sets.newHashSet(new Tuple2<>("a", 4), new Tuple2<>("b", 3), new 
Tuple2<>("c", 1)),
+        Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new 
Tuple2<>("c", 1)),
+        Sets.newHashSet(new Tuple2<>("a", 5), new Tuple2<>("b", 3), new 
Tuple2<>("c", 1))
     );
 
     Function3<String, Optional<Integer>, State<Integer>, Integer> mappingFunc =
         new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
-
           @Override
-          public Integer call(String key, Optional<Integer> value, 
State<Integer> state) throws Exception {
+          public Integer call(String key, Optional<Integer> value, 
State<Integer> state) {
             int sum = value.or(0) + (state.exists() ? state.get() : 0);
             state.update(sum);
             return sum;
@@ -160,7 +146,7 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
         };
     testOperation(
         inputData,
-        StateSpec.<String, Integer, Integer, Integer>function(mappingFunc),
+        StateSpec.function(mappingFunc),
         outputData,
         stateData);
   }
@@ -175,27 +161,25 @@ public class JavaMapWithStateSuite extends 
LocalJavaStreamingContext implements
     JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
         JavaPairDStream.fromJavaDStream(inputStream.map(new Function<K, 
Tuple2<K, Integer>>() {
           @Override
-          public Tuple2<K, Integer> call(K x) throws Exception {
-            return new Tuple2<K, Integer>(x, 1);
+          public Tuple2<K, Integer> call(K x) {
+            return new Tuple2<>(x, 1);
           }
         })).mapWithState(mapWithStateSpec);
 
     final List<Set<T>> collectedOutputs =
-        Collections.synchronizedList(Lists.<Set<T>>newArrayList());
-    mapWithStateDStream.foreachRDD(new Function<JavaRDD<T>, Void>() {
+        Collections.synchronizedList(new ArrayList<Set<T>>());
+    mapWithStateDStream.foreachRDD(new VoidFunction<JavaRDD<T>>() {
       @Override
-      public Void call(JavaRDD<T> rdd) throws Exception {
+      public void call(JavaRDD<T> rdd) {
         collectedOutputs.add(Sets.newHashSet(rdd.collect()));
-        return null;
       }
     });
     final List<Set<Tuple2<K, S>>> collectedStateSnapshots =
-        Collections.synchronizedList(Lists.<Set<Tuple2<K, S>>>newArrayList());
-    mapWithStateDStream.stateSnapshots().foreachRDD(new 
Function<JavaPairRDD<K, S>, Void>() {
+        Collections.synchronizedList(new ArrayList<Set<Tuple2<K, S>>>());
+    mapWithStateDStream.stateSnapshots().foreachRDD(new 
VoidFunction<JavaPairRDD<K, S>>() {
       @Override
-      public Void call(JavaPairRDD<K, S> rdd) throws Exception {
+      public void call(JavaPairRDD<K, S> rdd) {
         collectedStateSnapshots.add(Sets.newHashSet(rdd.collect()));
-        return null;
       }
     });
     BatchCounter batchCounter = new BatchCounter(ssc.ssc());

http://git-wip-us.apache.org/repos/asf/spark/blob/b9c83533/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java 
b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index 7a8ef9d..d09258e 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -18,13 +18,14 @@
 package org.apache.spark.streaming;
 
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import static org.junit.Assert.*;
 
 import com.google.common.io.Closeables;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -68,12 +69,11 @@ public class JavaReceiverAPISuite implements Serializable {
           return v1 + ".";
         }
       });
-      mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
+      mapped.foreachRDD(new VoidFunction<JavaRDD<String>>() {
         @Override
-        public Void call(JavaRDD<String> rdd) {
+        public void call(JavaRDD<String> rdd) {
           long count = rdd.count();
           dataCounter.addAndGet(count);
-          return null;
         }
       });
 
@@ -90,7 +90,7 @@ public class JavaReceiverAPISuite implements Serializable {
         Thread.sleep(100);
       }
       ssc.stop();
-      assertTrue(dataCounter.get() > 0);
+      Assert.assertTrue(dataCounter.get() > 0);
     } finally {
       server.stop();
     }
@@ -98,8 +98,8 @@ public class JavaReceiverAPISuite implements Serializable {
 
   private static class JavaSocketReceiver extends Receiver<String> {
 
-    String host = null;
-    int port = -1;
+    private String host = null;
+    private int port = -1;
 
     JavaSocketReceiver(String host_ , int port_) {
       super(StorageLevel.MEMORY_AND_DISK());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to