Repository: spark
Updated Branches:
  refs/heads/master ba8912e5f -> de14d35f7


http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
index f69aa4b..1ee68da 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.recommendation.ALS;
 import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
 import org.apache.spark.mllib.recommendation.Rating;
@@ -37,15 +36,12 @@ public class JavaRecommendationExample {
     // Load and parse the data
     String path = "data/mllib/als/test.data";
     JavaRDD<String> data = jsc.textFile(path);
-    JavaRDD<Rating> ratings = data.map(
-      new Function<String, Rating>() {
-        public Rating call(String s) {
-          String[] sarray = s.split(",");
-          return new Rating(Integer.parseInt(sarray[0]), 
Integer.parseInt(sarray[1]),
-            Double.parseDouble(sarray[2]));
-        }
-      }
-    );
+    JavaRDD<Rating> ratings = data.map(s -> {
+      String[] sarray = s.split(",");
+      return new Rating(Integer.parseInt(sarray[0]),
+        Integer.parseInt(sarray[1]),
+        Double.parseDouble(sarray[2]));
+    });
 
     // Build the recommendation model using ALS
     int rank = 10;
@@ -53,37 +49,19 @@ public class JavaRecommendationExample {
     MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, 
numIterations, 0.01);
 
     // Evaluate the model on rating data
-    JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
-      new Function<Rating, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(Rating r) {
-          return new Tuple2<Object, Object>(r.user(), r.product());
-        }
-      }
-    );
+    JavaRDD<Tuple2<Object, Object>> userProducts =
+      ratings.map(r -> new Tuple2<>(r.user(), r.product()));
     JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = 
JavaPairRDD.fromJavaRDD(
-      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
-        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
-          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
-            return new Tuple2<>(new Tuple2<>(r.user(), r.product()), 
r.rating());
-          }
-        }
-      ));
-    JavaRDD<Tuple2<Double, Double>> ratesAndPreds =
-      JavaPairRDD.fromJavaRDD(ratings.map(
-        new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Double>>() {
-          public Tuple2<Tuple2<Integer, Integer>, Double> call(Rating r){
-            return new Tuple2<>(new Tuple2<>(r.user(), r.product()), 
r.rating());
-          }
-        }
-      )).join(predictions).values();
-    double MSE = JavaDoubleRDD.fromRDD(ratesAndPreds.map(
-      new Function<Tuple2<Double, Double>, Object>() {
-        public Object call(Tuple2<Double, Double> pair) {
-          Double err = pair._1() - pair._2();
-          return err * err;
-        }
-      }
-    ).rdd()).mean();
+      model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
+          .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), 
r.rating()))
+    );
+    JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
+        ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), 
r.rating())))
+      .join(predictions).values();
+    double MSE = ratesAndPreds.mapToDouble(pair -> {
+      double err = pair._1() - pair._2();
+      return err * err;
+    }).mean();
     System.out.println("Mean Squared Error = " + MSE);
 
     // Save and load model

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
index b3e5c04..7bb9993 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRegressionMetricsExample.java
@@ -21,7 +21,6 @@ package org.apache.spark.examples.mllib;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.*;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.apache.spark.mllib.regression.LinearRegressionModel;
@@ -38,34 +37,24 @@ public class JavaRegressionMetricsExample {
     // Load and parse the data
     String path = "data/mllib/sample_linear_regression_data.txt";
     JavaRDD<String> data = sc.textFile(path);
-    JavaRDD<LabeledPoint> parsedData = data.map(
-      new Function<String, LabeledPoint>() {
-        public LabeledPoint call(String line) {
-          String[] parts = line.split(" ");
-          double[] v = new double[parts.length - 1];
-          for (int i = 1; i < parts.length - 1; i++) {
-            v[i - 1] = Double.parseDouble(parts[i].split(":")[1]);
-          }
-          return new LabeledPoint(Double.parseDouble(parts[0]), 
Vectors.dense(v));
-        }
+    JavaRDD<LabeledPoint> parsedData = data.map(line -> {
+      String[] parts = line.split(" ");
+      double[] v = new double[parts.length - 1];
+      for (int i = 1; i < parts.length - 1; i++) {
+        v[i - 1] = Double.parseDouble(parts[i].split(":")[1]);
       }
-    );
+      return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v));
+    });
     parsedData.cache();
 
     // Building the model
     int numIterations = 100;
-    final LinearRegressionModel model = 
LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData),
+    LinearRegressionModel model = 
LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData),
       numIterations);
 
     // Evaluate model on training examples and compute training error
-    JavaRDD<Tuple2<Object, Object>> valuesAndPreds = parsedData.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(LabeledPoint point) {
-          double prediction = model.predict(point.features());
-          return new Tuple2<Object, Object>(prediction, point.label());
-        }
-      }
-    );
+    JavaPairRDD<Object, Object> valuesAndPreds = parsedData.mapToPair(point ->
+      new Tuple2<>(model.predict(point.features()), point.label()));
 
     // Instantiate metrics object
     RegressionMetrics metrics = new RegressionMetrics(valuesAndPreds.rdd());

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
index 720b167..866a221 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java
@@ -24,7 +24,6 @@ import org.apache.spark.SparkContext;
 import scala.Tuple2;
 
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.mllib.classification.SVMModel;
 import org.apache.spark.mllib.classification.SVMWithSGD;
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
@@ -50,20 +49,14 @@ public class JavaSVMWithSGDExample {
 
     // Run training algorithm to build the model.
     int numIterations = 100;
-    final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
+    SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);
 
     // Clear the default threshold.
     model.clearThreshold();
 
     // Compute raw scores on the test set.
-    JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(
-      new Function<LabeledPoint, Tuple2<Object, Object>>() {
-        public Tuple2<Object, Object> call(LabeledPoint p) {
-          Double score = model.predict(p.features());
-          return new Tuple2<Object, Object>(score, p.label());
-        }
-      }
-    );
+    JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->
+      new Tuple2<>(model.predict(p.features()), p.label()));
 
     // Get evaluation metrics.
     BinaryClassificationMetrics metrics =

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
index 7f4fe60..f9198e7 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java
@@ -23,9 +23,6 @@ import java.util.List;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-// $example off$
-import org.apache.spark.api.java.function.Function;
-// $example on$
 import org.apache.spark.mllib.fpm.AssociationRules;
 import org.apache.spark.mllib.fpm.FPGrowth;
 import org.apache.spark.mllib.fpm.FPGrowthModel;
@@ -42,14 +39,7 @@ public class JavaSimpleFPGrowth {
     // $example on$
     JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
 
-    JavaRDD<List<String>> transactions = data.map(
-      new Function<String, List<String>>() {
-        public List<String> call(String line) {
-          String[] parts = line.split(" ");
-          return Arrays.asList(parts);
-        }
-      }
-    );
+    JavaRDD<List<String>> transactions = data.map(line -> 
Arrays.asList(line.split(" ")));
 
     FPGrowth fpg = new FPGrowth()
       .setMinSupport(0.2)

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
index cfaa577..4be702c 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -17,10 +17,6 @@
 
 package org.apache.spark.examples.mllib;
 
-
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
 // $example on$
 import org.apache.spark.mllib.stat.test.BinarySample;
 import org.apache.spark.mllib.stat.test.StreamingTest;
@@ -75,16 +71,12 @@ public class JavaStreamingTestExample {
     ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), 
"spark").toString());
 
     // $example on$
-    JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
-      new Function<String, BinarySample>() {
-        @Override
-        public BinarySample call(String line) {
-          String[] ts = line.split(",");
-          boolean label = Boolean.parseBoolean(ts[0]);
-          double value = Double.parseDouble(ts[1]);
-          return new BinarySample(label, value);
-        }
-      });
+    JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(line -> {
+      String[] ts = line.split(",");
+      boolean label = Boolean.parseBoolean(ts[0]);
+      double value = Double.parseDouble(ts[1]);
+      return new BinarySample(label, value);
+    });
 
     StreamingTest streamingTest = new StreamingTest()
       .setPeacePeriod(0)
@@ -98,21 +90,11 @@ public class JavaStreamingTestExample {
     // Stop processing if test becomes significant or we time out
     timeoutCounter = numBatchesTimeout;
 
-    out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
-      @Override
-      public void call(JavaRDD<StreamingTestResult> rdd) {
-        timeoutCounter -= 1;
-
-        boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, 
Boolean>() {
-          @Override
-          public Boolean call(StreamingTestResult v) {
-            return v.pValue() < 0.05;
-          }
-        }).isEmpty();
-
-        if (timeoutCounter <= 0 || anySignificant) {
-          rdd.context().stop();
-        }
+    out.foreachRDD(rdd -> {
+      timeoutCounter -= 1;
+      boolean anySignificant = !rdd.filter(v -> v.pValue() < 0.05).isEmpty();
+      if (timeoutCounter <= 0 || anySignificant) {
+        rdd.context().stop();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index b687fae..adb96dd 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -139,11 +139,9 @@ public class JavaSQLDataSourceExample {
     // Parquet files can also be used to create a temporary view and then used 
in SQL statements
     parquetFileDF.createOrReplaceTempView("parquetFile");
     Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age 
BETWEEN 13 AND 19");
-    Dataset<String> namesDS = namesDF.map(new MapFunction<Row, String>() {
-      public String call(Row row) {
-        return "Name: " + row.getString(0);
-      }
-    }, Encoders.STRING());
+    Dataset<String> namesDS = namesDF.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+        Encoders.STRING());
     namesDS.show();
     // +------------+
     // |       value|

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
index c5770d1..8605852 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
@@ -227,12 +227,9 @@ public class JavaSparkSQLExample {
     // Encoders for most common types are provided in class Encoders
     Encoder<Integer> integerEncoder = Encoders.INT();
     Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), 
integerEncoder);
-    Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, 
Integer>() {
-      @Override
-      public Integer call(Integer value) throws Exception {
-        return value + 1;
-      }
-    }, integerEncoder);
+    Dataset<Integer> transformedDS = primitiveDS.map(
+        (MapFunction<Integer, Integer>) value -> value + 1,
+        integerEncoder);
     transformedDS.collect(); // Returns [2, 3, 4]
 
     // DataFrames can be converted to a Dataset by providing a class. Mapping 
based on name
@@ -255,15 +252,12 @@ public class JavaSparkSQLExample {
     JavaRDD<Person> peopleRDD = spark.read()
       .textFile("examples/src/main/resources/people.txt")
       .javaRDD()
-      .map(new Function<String, Person>() {
-        @Override
-        public Person call(String line) throws Exception {
-          String[] parts = line.split(",");
-          Person person = new Person();
-          person.setName(parts[0]);
-          person.setAge(Integer.parseInt(parts[1].trim()));
-          return person;
-        }
+      .map(line -> {
+        String[] parts = line.split(",");
+        Person person = new Person();
+        person.setName(parts[0]);
+        person.setAge(Integer.parseInt(parts[1].trim()));
+        return person;
       });
 
     // Apply a schema to an RDD of JavaBeans to get a DataFrame
@@ -276,12 +270,9 @@ public class JavaSparkSQLExample {
 
     // The columns of a row in the result can be accessed by field index
     Encoder<String> stringEncoder = Encoders.STRING();
-    Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(new 
MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Name: " + row.getString(0);
-      }
-    }, stringEncoder);
+    Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+        stringEncoder);
     teenagerNamesByIndexDF.show();
     // +------------+
     // |       value|
@@ -290,12 +281,9 @@ public class JavaSparkSQLExample {
     // +------------+
 
     // or by field name
-    Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(new 
MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Name: " + row.<String>getAs("name");
-      }
-    }, stringEncoder);
+    Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
+        stringEncoder);
     teenagerNamesByFieldDF.show();
     // +------------+
     // |       value|
@@ -324,12 +312,9 @@ public class JavaSparkSQLExample {
     StructType schema = DataTypes.createStructType(fields);
 
     // Convert records of the RDD (people) to Rows
-    JavaRDD<Row> rowRDD = peopleRDD.map(new Function<String, Row>() {
-      @Override
-      public Row call(String record) throws Exception {
-        String[] attributes = record.split(",");
-        return RowFactory.create(attributes[0], attributes[1].trim());
-      }
+    JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
+      String[] attributes = record.split(",");
+      return RowFactory.create(attributes[0], attributes[1].trim());
     });
 
     // Apply the schema to the RDD
@@ -343,12 +328,9 @@ public class JavaSparkSQLExample {
 
     // The results of SQL queries are DataFrames and support all the normal 
RDD operations
     // The columns of a row in the result can be accessed by field index or by 
field name
-    Dataset<String> namesDS = results.map(new MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Name: " + row.getString(0);
-      }
-    }, Encoders.STRING());
+    Dataset<String> namesDS = results.map(
+        (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+        Encoders.STRING());
     namesDS.show();
     // +-------------+
     // |        value|

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
index 2fe1307..4763856 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java
@@ -90,12 +90,9 @@ public class JavaSparkHiveExample {
     Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 
ORDER BY key");
 
     // The items in DaraFrames are of type Row, which lets you to access each 
column by ordinal.
-    Dataset<String> stringsDS = sqlDF.map(new MapFunction<Row, String>() {
-      @Override
-      public String call(Row row) throws Exception {
-        return "Key: " + row.get(0) + ", Value: " + row.get(1);
-      }
-    }, Encoders.STRING());
+    Dataset<String> stringsDS = sqlDF.map(
+        (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + 
row.get(1),
+        Encoders.STRING());
     stringsDS.show();
     // +--------------------+
     // |               value|

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
index 0f45cfe..4e02719 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java
@@ -25,7 +25,6 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.streaming.StreamingQuery;
 
 import java.util.Arrays;
-import java.util.Iterator;
 
 /**
  * Consumes messages from one or more topics in Kafka and does wordcount.
@@ -78,12 +77,9 @@ public final class JavaStructuredKafkaWordCount {
       .as(Encoders.STRING());
 
     // Generate running word count
-    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(x.split(" ")).iterator();
-      }
-    }, Encoders.STRING()).groupBy("value").count();
+    Dataset<Row> wordCounts = lines.flatMap(
+        (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" 
")).iterator(),
+        Encoders.STRING()).groupBy("value").count();
 
     // Start running the query that prints the running counts to the console
     StreamingQuery query = wordCounts.writeStream()

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index 5f342e1..3af7869 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -21,7 +21,6 @@ import org.apache.spark.sql.*;
 import org.apache.spark.sql.streaming.StreamingQuery;
 
 import java.util.Arrays;
-import java.util.Iterator;
 
 /**
  * Counts words in UTF8 encoded, '\n' delimited text received from the network.
@@ -61,13 +60,9 @@ public final class JavaStructuredNetworkWordCount {
       .load();
 
     // Split the lines into words
-    Dataset<String> words = lines.as(Encoders.STRING())
-      .flatMap(new FlatMapFunction<String, String>() {
-        @Override
-        public Iterator<String> call(String x) {
-          return Arrays.asList(x.split(" ")).iterator();
-        }
-    }, Encoders.STRING());
+    Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
+        (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" 
")).iterator(),
+        Encoders.STRING());
 
     // Generate running word count
     Dataset<Row> wordCounts = words.groupBy("value").count();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
 
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
index 172d053..93ec5e2 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -18,13 +18,11 @@ package org.apache.spark.examples.sql.streaming;
 
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.sql.*;
-import org.apache.spark.sql.functions;
 import org.apache.spark.sql.streaming.StreamingQuery;
 import scala.Tuple2;
 
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -86,16 +84,12 @@ public final class JavaStructuredNetworkWordCountWindowed {
     // Split the lines into words, retaining timestamps
     Dataset<Row> words = lines
       .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
-      .flatMap(
-        new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, 
Timestamp>>() {
-          @Override
-          public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, 
Timestamp> t) {
-            List<Tuple2<String, Timestamp>> result = new ArrayList<>();
-            for (String word : t._1.split(" ")) {
-              result.add(new Tuple2<>(word, t._2));
-            }
-            return result.iterator();
+      .flatMap((FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, 
Timestamp>>) t -> {
+          List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+          for (String word : t._1.split(" ")) {
+            result.add(new Tuple2<>(word, t._2));
           }
+          return result.iterator();
         },
         Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
       ).toDF("word", "timestamp");

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index e20b94d..47692ec 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -20,9 +20,6 @@ package org.apache.spark.examples.streaming;
 import com.google.common.io.Closeables;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -38,7 +35,6 @@ import java.net.ConnectException;
 import java.net.Socket;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.regex.Pattern;
 
 /**
@@ -74,23 +70,9 @@ public class JavaCustomReceiver extends Receiver<String> {
     // words in input stream of \n delimited text (eg. generated by 'nc')
     JavaReceiverInputDStream<String> lines = ssc.receiverStream(
       new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new 
Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
 
     wordCounts.print();
     ssc.start();
@@ -108,15 +90,13 @@ public class JavaCustomReceiver extends Receiver<String> {
     port = port_;
   }
 
+  @Override
   public void onStart() {
     // Start the thread that receives data over a connection
-    new Thread()  {
-      @Override public void run() {
-        receive();
-      }
-    }.start();
+    new Thread(this::receive).start();
   }
 
+  @Override
   public void onStop() {
     // There is nothing much to do as the thread calling receive()
     // is designed to stop by itself isStopped() returns false
@@ -127,13 +107,13 @@ public class JavaCustomReceiver extends Receiver<String> {
     try {
       Socket socket = null;
       BufferedReader reader = null;
-      String userInput = null;
       try {
         // connect to the server
         socket = new Socket(host, port);
         reader = new BufferedReader(
             new InputStreamReader(socket.getInputStream(), 
StandardCharsets.UTF_8));
         // Until stopped or connection broken continue reading
+        String userInput;
         while (!isStopped() && (userInput = reader.readLine()) != null) {
           System.out.println("Received data '" + userInput + "'");
           store(userInput);

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index ed118f8..5e5ae62 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -20,7 +20,6 @@ package org.apache.spark.examples.streaming;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -30,7 +29,6 @@ import scala.Tuple2;
 import kafka.serializer.StringDecoder;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.*;
 import org.apache.spark.streaming.api.java.*;
 import org.apache.spark.streaming.kafka.KafkaUtils;
 import org.apache.spark.streaming.Durations;
@@ -82,31 +80,10 @@ public final class JavaDirectKafkaWordCount {
     );
 
     // Get the lines, split them into words, count the words and print
-    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
-      @Override
-      public String call(Tuple2<String, String> tuple2) {
-        return tuple2._2();
-      }
-    });
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(
-        new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaDStream<String> lines = messages.map(Tuple2::_2);
+    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new 
Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
     wordCounts.print();
 
     // Start the computation

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index 33c0a2d..0c65104 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -18,7 +18,6 @@
 package org.apache.spark.examples.streaming;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.streaming.*;
 import org.apache.spark.streaming.api.java.*;
 import org.apache.spark.streaming.flume.FlumeUtils;
@@ -62,12 +61,7 @@ public final class JavaFlumeEventCount {
 
     flumeStream.count();
 
-    flumeStream.count().map(new Function<Long, String>() {
-      @Override
-      public String call(Long in) {
-        return "Received " + in + " flume events.";
-      }
-    }).print();
+    flumeStream.count().map(in -> "Received " + in + " flume events.").print();
 
     ssc.start();
     ssc.awaitTermination();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 8a5fd53..ce5acdc 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -18,7 +18,6 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.regex.Pattern;
@@ -26,10 +25,6 @@ import java.util.regex.Pattern;
 import scala.Tuple2;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -78,32 +73,12 @@ public final class JavaKafkaWordCount {
     JavaPairReceiverInputDStream<String, String> messages =
             KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
 
-    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
-      @Override
-      public String call(Tuple2<String, String> tuple2) {
-        return tuple2._2();
-      }
-    });
+    JavaDStream<String> lines = messages.map(Tuple2::_2);
 
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
+    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
 
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new 
Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
 
     wordCounts.print();
     jssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 7a8fe99..b217672 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -18,15 +18,11 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.api.java.StorageLevels;
 import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -66,24 +62,9 @@ public final class JavaNetworkWordCount {
     // Replication necessary in distributed scenario for fault tolerance.
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
             args[0], Integer.parseInt(args[1]), 
StorageLevels.MEMORY_AND_DISK_SER);
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
+    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new 
Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
 
     wordCounts.print();
     ssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
index 62413b4..e86f8ab 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
@@ -17,19 +17,15 @@
 
 package org.apache.spark.examples.streaming;
 
-
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
 import scala.Tuple2;
 
-import com.google.common.collect.Lists;
-
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -49,14 +45,14 @@ public final class JavaQueueStream {
 
     // Create the queue through which RDDs can be pushed to
     // a QueueInputDStream
-    Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
 
     // Create and push some RDDs into the queue
-    List<Integer> list = Lists.newArrayList();
+    List<Integer> list = new ArrayList<>();
     for (int i = 0; i < 1000; i++) {
       list.add(i);
     }
 
+    Queue<JavaRDD<Integer>> rddQueue = new LinkedList<>();
     for (int i = 0; i < 30; i++) {
       rddQueue.add(ssc.sparkContext().parallelize(list));
     }
@@ -64,19 +60,9 @@ public final class JavaQueueStream {
     // Create the QueueInputDStream and use it do some processing
     JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
     JavaPairDStream<Integer, Integer> mappedStream = inputStream.mapToPair(
-        new PairFunction<Integer, Integer, Integer>() {
-          @Override
-          public Tuple2<Integer, Integer> call(Integer i) {
-            return new Tuple2<>(i % 10, 1);
-          }
-        });
+        i -> new Tuple2<>(i % 10, 1));
     JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
-      new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-    });
+        (i1, i2) -> i1 + i2);
 
     reducedStream.print();
     ssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index acbc345..45a876d 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -18,10 +18,8 @@
 package org.apache.spark.examples.streaming;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -30,12 +28,10 @@ import scala.Tuple2;
 import com.google.common.io.Files;
 
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -120,7 +116,7 @@ public final class JavaRecoverableNetworkWordCount {
     // If you do not see this printed, that means the StreamingContext has 
been loaded
     // from the new checkpoint
     System.out.println("Creating new context");
-    final File outputFile = new File(outputPath);
+    File outputFile = new File(outputPath);
     if (outputFile.exists()) {
       outputFile.delete();
     }
@@ -132,52 +128,31 @@ public final class JavaRecoverableNetworkWordCount {
     // Create a socket stream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
+    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new 
Tuple2<>(s, 1))
+        .reduceByKey((i1, i2) -> i1 + i2);
+
+    wordCounts.foreachRDD((rdd, time) -> {
+      // Get or register the blacklist Broadcast
+      Broadcast<List<String>> blacklist =
+          JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+      // Get or register the droppedWordsCounter Accumulator
+      LongAccumulator droppedWordsCounter =
+          JavaDroppedWordsCounter.getInstance(new 
JavaSparkContext(rdd.context()));
+      // Use blacklist to drop words and use droppedWordsCounter to count them
+      String counts = rdd.filter(wordCount -> {
+        if (blacklist.value().contains(wordCount._1())) {
+          droppedWordsCounter.add(wordCount._2());
+          return false;
+        } else {
+          return true;
         }
-      });
-
-    wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, 
Time>() {
-      @Override
-      public void call(JavaPairRDD<String, Integer> rdd, Time time) throws 
IOException {
-        // Get or register the blacklist Broadcast
-        final Broadcast<List<String>> blacklist =
-            JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
-        // Get or register the droppedWordsCounter Accumulator
-        final LongAccumulator droppedWordsCounter =
-            JavaDroppedWordsCounter.getInstance(new 
JavaSparkContext(rdd.context()));
-        // Use blacklist to drop words and use droppedWordsCounter to count 
them
-        String counts = rdd.filter(new Function<Tuple2<String, Integer>, 
Boolean>() {
-          @Override
-          public Boolean call(Tuple2<String, Integer> wordCount) {
-            if (blacklist.value().contains(wordCount._1())) {
-              droppedWordsCounter.add(wordCount._2());
-              return false;
-            } else {
-              return true;
-            }
-          }
-        }).collect().toString();
-        String output = "Counts at time " + time + " " + counts;
-        System.out.println(output);
-        System.out.println("Dropped " + droppedWordsCounter.value() + " 
word(s) totally");
-        System.out.println("Appending to " + outputFile.getAbsolutePath());
-        Files.append(output + "\n", outputFile, Charset.defaultCharset());
-      }
+      }).collect().toString();
+      String output = "Counts at time " + time + " " + counts;
+      System.out.println(output);
+      System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) 
totally");
+      System.out.println("Appending to " + outputFile.getAbsolutePath());
+      Files.append(output + "\n", outputFile, Charset.defaultCharset());
     });
 
     return ssc;
@@ -198,19 +173,15 @@ public final class JavaRecoverableNetworkWordCount {
       System.exit(1);
     }
 
-    final String ip = args[0];
-    final int port = Integer.parseInt(args[1]);
-    final String checkpointDirectory = args[2];
-    final String outputPath = args[3];
+    String ip = args[0];
+    int port = Integer.parseInt(args[1]);
+    String checkpointDirectory = args[2];
+    String outputPath = args[3];
 
     // Function to create JavaStreamingContext without any output operations
     // (used to detect the new context)
-    Function0<JavaStreamingContext> createContextFunc = new 
Function0<JavaStreamingContext>() {
-      @Override
-      public JavaStreamingContext call() {
-        return createContext(ip, port, checkpointDirectory, outputPath);
-      }
-    };
+    Function0<JavaStreamingContext> createContextFunc =
+        () -> createContext(ip, port, checkpointDirectory, outputPath);
 
     JavaStreamingContext ssc =
       JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index b8e9e12..948d1a2 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -18,20 +18,15 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.api.java.StorageLevels;
 import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -48,7 +43,6 @@ import 
org.apache.spark.streaming.api.java.JavaStreamingContext;
  * and then run the example
  *    `$ bin/run-example 
org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999`
  */
-
 public final class JavaSqlNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
@@ -70,39 +64,28 @@ public final class JavaSqlNetworkWordCount {
     // Replication necessary in distributed scenario for fault tolerance.
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
         args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
+    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
 
     // Convert RDDs of the words DStream to DataFrame and run SQL query
-    words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
-      @Override
-      public void call(JavaRDD<String> rdd, Time time) {
-        SparkSession spark = 
JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
+    words.foreachRDD((rdd, time) -> {
+      SparkSession spark = 
JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
 
-        // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
-        JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, 
JavaRecord>() {
-          @Override
-          public JavaRecord call(String word) {
-            JavaRecord record = new JavaRecord();
-            record.setWord(word);
-            return record;
-          }
-        });
-        Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, 
JavaRecord.class);
+      // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
+      JavaRDD<JavaRecord> rowRDD = rdd.map(word -> {
+        JavaRecord record = new JavaRecord();
+        record.setWord(word);
+        return record;
+      });
+      Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, 
JavaRecord.class);
 
-        // Creates a temporary view using the DataFrame
-        wordsDataFrame.createOrReplaceTempView("words");
+      // Creates a temporary view using the DataFrame
+      wordsDataFrame.createOrReplaceTempView("words");
 
-        // Do word count on table using SQL and print it
-        Dataset<Row> wordCountsDataFrame =
-            spark.sql("select word, count(*) as total from words group by 
word");
-        System.out.println("========= " + time + "=========");
-        wordCountsDataFrame.show();
-      }
+      // Do word count on table using SQL and print it
+      Dataset<Row> wordCountsDataFrame =
+          spark.sql("select word, count(*) as total from words group by word");
+      System.out.println("========= " + time + "=========");
+      wordCountsDataFrame.show();
     });
 
     ssc.start();

http://git-wip-us.apache.org/repos/asf/spark/blob/de14d35f/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index ed36df8..9d8bd7f 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -18,7 +18,6 @@
 package org.apache.spark.examples.streaming;
 
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -72,32 +71,17 @@ public class JavaStatefulNetworkWordCount {
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
             args[0], Integer.parseInt(args[1]), 
StorageLevels.MEMORY_AND_DISK_SER_2);
 
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterator<String> call(String x) {
-        return Arrays.asList(SPACE.split(x)).iterator();
-      }
-    });
+    JavaDStream<String> words = lines.flatMap(x -> 
Arrays.asList(SPACE.split(x)).iterator());
 
-    JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
-        new PairFunction<String, String, Integer>() {
-          @Override
-          public Tuple2<String, Integer> call(String s) {
-            return new Tuple2<>(s, 1);
-          }
-        });
+    JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(s -> new 
Tuple2<>(s, 1));
 
     // Update the cumulative count function
     Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, 
Integer>> mappingFunc =
-        new Function3<String, Optional<Integer>, State<Integer>, 
Tuple2<String, Integer>>() {
-          @Override
-          public Tuple2<String, Integer> call(String word, Optional<Integer> 
one,
-              State<Integer> state) {
-            int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
-            Tuple2<String, Integer> output = new Tuple2<>(word, sum);
-            state.update(sum);
-            return output;
-          }
+        (word, one, state) -> {
+          int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
+          Tuple2<String, Integer> output = new Tuple2<>(word, sum);
+          state.update(sum);
+          return output;
         };
 
     // DStream made of get cumulative counts that get updated in every batch


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to