http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/core/src/test/java/test/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java index 80aab10..5121491 100644 --- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -31,7 +31,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.*; import org.apache.spark.Accumulator; @@ -208,7 +207,7 @@ public class JavaAPISuite implements Serializable { assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); // Custom comparator - sortedRDD = rdd.sortByKey(Collections.<Integer>reverseOrder(), false); + sortedRDD = rdd.sortByKey(Collections.reverseOrder(), false); assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); sortedPairs = sortedRDD.collect(); assertEquals(new Tuple2<>(0, 4), sortedPairs.get(1)); @@ -266,13 +265,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(pairs); // compare on first value - JavaRDD<Tuple2<Integer, Integer>> sortedRDD = - rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() { - @Override - public Integer call(Tuple2<Integer, Integer> t) { - return t._1(); - } - }, true, 2); + JavaRDD<Tuple2<Integer, Integer>> sortedRDD = rdd.sortBy(Tuple2::_1, true, 2); assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect(); @@ -280,12 +273,7 @@ public class JavaAPISuite implements Serializable { assertEquals(new Tuple2<>(3, 2), sortedPairs.get(2)); // compare on second value - sortedRDD = rdd.sortBy(new Function<Tuple2<Integer, Integer>, Integer>() { - @Override - public Integer call(Tuple2<Integer, Integer> t) { - return t._2(); - } - }, true, 2); + sortedRDD = rdd.sortBy(Tuple2::_2, true, 2); assertEquals(new Tuple2<>(-1, 1), sortedRDD.first()); sortedPairs = sortedRDD.collect(); assertEquals(new Tuple2<>(3, 2), sortedPairs.get(1)); @@ -294,28 +282,20 @@ public class JavaAPISuite implements Serializable { @Test public void foreach() { - final LongAccumulator accum = sc.sc().longAccumulator(); + LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction<String>() { - @Override - public void call(String s) { - accum.add(1); - } - }); + rdd.foreach(s -> accum.add(1)); assertEquals(2, accum.value().intValue()); } @Test public void foreachPartition() { - final LongAccumulator accum = sc.sc().longAccumulator(); + LongAccumulator accum = sc.sc().longAccumulator(); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreachPartition(new VoidFunction<Iterator<String>>() { - @Override - public void call(Iterator<String> iter) { - while (iter.hasNext()) { - iter.next(); - accum.add(1); - } + rdd.foreachPartition(iter -> { + while (iter.hasNext()) { + iter.next(); + accum.add(1); } }); assertEquals(2, accum.value().intValue()); @@ -361,12 +341,7 @@ public class JavaAPISuite implements Serializable { @Test public void groupBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() { - @Override - public Boolean call(Integer x) { - return x % 2 == 0; - } - }; + Function<Integer, Boolean> isOdd = x -> x % 2 == 0; JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); assertEquals(2, oddsAndEvens.count()); assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens @@ -383,12 +358,7 @@ public class JavaAPISuite implements Serializable { // Regression test for SPARK-4459 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function<Tuple2<Integer, Integer>, Boolean> areOdd = - new Function<Tuple2<Integer, Integer>, Boolean>() { - @Override - public Boolean call(Tuple2<Integer, Integer> x) { - return (x._1() % 2 == 0) && (x._2() % 2 == 0); - } - }; + x -> (x._1() % 2 == 0) && (x._2() % 2 == 0); JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd); JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd); assertEquals(2, oddsAndEvens.count()); @@ -406,13 +376,7 @@ public class JavaAPISuite implements Serializable { public void keyByOnPairRDD() { // Regression test for SPARK-4459 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Tuple2<Integer, Integer>, String> sumToString = - new Function<Tuple2<Integer, Integer>, String>() { - @Override - public String call(Tuple2<Integer, Integer> x) { - return String.valueOf(x._1() + x._2()); - } - }; + Function<Tuple2<Integer, Integer>, String> sumToString = x -> String.valueOf(x._1() + x._2()); JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd); JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString); assertEquals(7, keyed.count()); @@ -516,25 +480,14 @@ public class JavaAPISuite implements Serializable { rdd1.leftOuterJoin(rdd2).collect(); assertEquals(5, joined.size()); Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter( - new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() { - @Override - public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) { - return !tup._2()._2().isPresent(); - } - }).first(); + rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); assertEquals(3, firstUnmatched._1().intValue()); } @Test public void foldReduce() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; + Function2<Integer, Integer, Integer> add = (a, b) -> a + b; int sum = rdd.fold(0, add); assertEquals(33, sum); @@ -546,12 +499,7 @@ public class JavaAPISuite implements Serializable { @Test public void treeReduce() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10); - Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; + Function2<Integer, Integer, Integer> add = (a, b) -> a + b; for (int depth = 1; depth <= 10; depth++) { int sum = rdd.treeReduce(add, depth); assertEquals(-5, sum); @@ -561,12 +509,7 @@ public class JavaAPISuite implements Serializable { @Test public void treeAggregate() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(-5, -4, -3, -2, -1, 1, 2, 3, 4), 10); - Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; + Function2<Integer, Integer, Integer> add = (a, b) -> a + b; for (int depth = 1; depth <= 10; depth++) { int sum = rdd.treeAggregate(0, add, add, depth); assertEquals(-5, sum); @@ -584,21 +527,15 @@ public class JavaAPISuite implements Serializable { new Tuple2<>(5, 1), new Tuple2<>(5, 3)), 2); - Map<Integer, Set<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(), - new Function2<Set<Integer>, Integer, Set<Integer>>() { - @Override - public Set<Integer> call(Set<Integer> a, Integer b) { - a.add(b); - return a; - } - }, - new Function2<Set<Integer>, Set<Integer>, Set<Integer>>() { - @Override - public Set<Integer> call(Set<Integer> a, Set<Integer> b) { - a.addAll(b); - return a; - } - }).collectAsMap(); + Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(), + (a, b) -> { + a.add(b); + return a; + }, + (a, b) -> { + a.addAll(b); + return a; + }).collectAsMap(); assertEquals(3, sets.size()); assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1)); assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3)); @@ -616,13 +553,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); + JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); assertEquals(1, sums.lookup(1).get(0).intValue()); assertEquals(2, sums.lookup(2).get(0).intValue()); assertEquals(3, sums.lookup(3).get(0).intValue()); @@ -639,13 +570,7 @@ public class JavaAPISuite implements Serializable { new Tuple2<>(3, 1) ); JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); + JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); assertEquals(1, counts.lookup(1).get(0).intValue()); assertEquals(2, counts.lookup(2).get(0).intValue()); assertEquals(3, counts.lookup(3).get(0).intValue()); @@ -655,12 +580,7 @@ public class JavaAPISuite implements Serializable { assertEquals(2, localCounts.get(2).intValue()); assertEquals(3, localCounts.get(3).intValue()); - localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); + localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); assertEquals(1, localCounts.get(1).intValue()); assertEquals(2, localCounts.get(2).intValue()); assertEquals(3, localCounts.get(3).intValue()); @@ -692,20 +612,8 @@ public class JavaAPISuite implements Serializable { assertTrue(sc.emptyRDD().isEmpty()); assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty()); assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty()); - assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter( - new Function<Integer,Boolean>() { - @Override - public Boolean call(Integer i) { - return i < 0; - } - }).isEmpty()); - assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter( - new Function<Integer, Boolean>() { - @Override - public Boolean call(Integer i) { - return i > 1; - } - }).isEmpty()); + assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(i -> i < 0).isEmpty()); + assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(i -> i > 1).isEmpty()); } @Test @@ -721,12 +629,7 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); JavaDoubleRDD distinct = rdd.distinct(); assertEquals(5, distinct.count()); - JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() { - @Override - public Boolean call(Double x) { - return x > 2.0; - } - }); + JavaDoubleRDD filter = rdd.filter(x -> x > 2.0); assertEquals(3, filter.count()); JavaDoubleRDD union = rdd.union(rdd); assertEquals(12, union.count()); @@ -764,7 +667,7 @@ public class JavaAPISuite implements Serializable { // SPARK-5744 assertArrayEquals( new long[] {0}, - sc.parallelizeDoubles(new ArrayList<Double>(0), 1).histogram(new double[]{0.0, 1.0})); + sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0})); } private static class DoubleComparator implements Comparator<Double>, Serializable { @@ -833,12 +736,7 @@ public class JavaAPISuite implements Serializable { @Test public void reduceOnJavaDoubleRDD() { JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - double sum = rdd.reduce(new Function2<Double, Double, Double>() { - @Override - public Double call(Double v1, Double v2) { - return v1 + v2; - } - }); + double sum = rdd.reduce((v1, v2) -> v1 + v2); assertEquals(10.0, sum, 0.001); } @@ -859,27 +757,11 @@ public class JavaAPISuite implements Serializable { @Test public void map() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() { - @Override - public double call(Integer x) { - return x.doubleValue(); - } - }).cache(); + JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue).cache(); doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer x) { - return new Tuple2<>(x, x); - } - }).cache(); + JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)).cache(); pairs.collect(); - JavaRDD<String> strings = rdd.map(new Function<Integer, String>() { - @Override - public String call(Integer x) { - return x.toString(); - } - }).cache(); + JavaRDD<String> strings = rdd.map(Object::toString).cache(); strings.collect(); } @@ -887,39 +769,27 @@ public class JavaAPISuite implements Serializable { public void flatMap() { JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", "The quick brown fox jumps over the lazy dog.")); - JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }); + JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); assertEquals("Hello", words.first()); assertEquals(11, words.count()); - JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair( - new PairFlatMapFunction<String, String, String>() { - @Override - public Iterator<Tuple2<String, String>> call(String s) { - List<Tuple2<String, String>> pairs = new LinkedList<>(); - for (String word : s.split(" ")) { - pairs.add(new Tuple2<>(word, word)); - } - return pairs.iterator(); + JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(s -> { + List<Tuple2<String, String>> pairs = new LinkedList<>(); + for (String word : s.split(" ")) { + pairs.add(new Tuple2<>(word, word)); } + return pairs.iterator(); } ); assertEquals(new Tuple2<>("Hello", "Hello"), pairsRDD.first()); assertEquals(11, pairsRDD.count()); - JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() { - @Override - public Iterator<Double> call(String s) { - List<Double> lengths = new LinkedList<>(); - for (String word : s.split(" ")) { - lengths.add((double) word.length()); - } - return lengths.iterator(); + JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { + List<Double> lengths = new LinkedList<>(); + for (String word : s.split(" ")) { + lengths.add((double) word.length()); } + return lengths.iterator(); }); assertEquals(5.0, doubles.first(), 0.01); assertEquals(11, pairsRDD.count()); @@ -937,37 +807,23 @@ public class JavaAPISuite implements Serializable { // Regression test for SPARK-668: JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair( - new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) { - return Collections.singletonList(item.swap()).iterator(); - } - }); + item -> Collections.singletonList(item.swap()).iterator()); swapped.collect(); // There was never a bug here, but it's worth testing: - pairRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Tuple2<String, Integer> call(Tuple2<Integer, String> item) { - return item.swap(); - } - }).collect(); + pairRDD.mapToPair(Tuple2::swap).collect(); } @Test public void mapPartitions() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitions( - new FlatMapFunction<Iterator<Integer>, Integer>() { - @Override - public Iterator<Integer> call(Iterator<Integer> iter) { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); + JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); } - }); + return Collections.singletonList(sum).iterator(); + }); assertEquals("[3, 7]", partitionSums.collect().toString()); } @@ -975,17 +831,13 @@ public class JavaAPISuite implements Serializable { @Test public void mapPartitionsWithIndex() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex( - new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() { - @Override - public Iterator<Integer> call(Integer index, Iterator<Integer> iter) { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); + JavaRDD<Integer> partitionSums = rdd.mapPartitionsWithIndex((index, iter) -> { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); } - }, false); + return Collections.singletonList(sum).iterator(); + }, false); assertEquals("[3, 7]", partitionSums.collect().toString()); } @@ -1124,21 +976,12 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); // Try reading the output back as an object file JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, - Text.class).mapToPair(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() { - @Override - public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) { - return new Tuple2<>(pair._1().get(), pair._2().toString()); - } - }); + Text.class).mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); assertEquals(pairs, readRDD.collect()); } @@ -1179,12 +1022,7 @@ public class JavaAPISuite implements Serializable { channel1.close(); JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName).cache(); - readRDD.foreach(new VoidFunction<Tuple2<String,PortableDataStream>>() { - @Override - public void call(Tuple2<String, PortableDataStream> pair) { - pair._2().toArray(); // force the file to read - } - }); + readRDD.foreach(pair -> pair._2().toArray()); // force the file to read List<Tuple2<String, PortableDataStream>> result = readRDD.collect(); for (Tuple2<String, PortableDataStream> res : result) { @@ -1229,23 +1067,13 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsNewAPIHadoopFile( - outputDir, IntWritable.class, Text.class, + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @SuppressWarnings("unchecked") @@ -1259,22 +1087,13 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, Job.getInstance().getConfiguration()); - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @Test @@ -1315,21 +1134,12 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, SequenceFileInputFormat.class, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @SuppressWarnings("unchecked") @@ -1343,34 +1153,19 @@ public class JavaAPISuite implements Serializable { ); JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, - DefaultCodec.class); + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, DefaultCodec.class); JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, SequenceFileInputFormat.class, IntWritable.class, Text.class); - assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); + assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString()); } @Test public void zip() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(new DoubleFunction<Integer>() { - @Override - public double call(Integer x) { - return x.doubleValue(); - } - }); + JavaDoubleRDD doubles = rdd.mapToDouble(Integer::doubleValue); JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); zipped.count(); } @@ -1380,12 +1175,7 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = - new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() { - @Override - public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) { - return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator(); - } - }; + (i, s) -> Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator(); JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); @@ -1396,22 +1186,12 @@ public class JavaAPISuite implements Serializable { public void accumulators() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - final Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer x) { - intAccum.add(x); - } - }); + Accumulator<Integer> intAccum = sc.intAccumulator(10); + rdd.foreach(intAccum::add); assertEquals((Integer) 25, intAccum.value()); - final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer x) { - doubleAccum.add((double) x); - } - }); + Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); + rdd.foreach(x -> doubleAccum.add((double) x)); assertEquals((Double) 25.0, doubleAccum.value()); // Try a custom accumulator type @@ -1432,13 +1212,8 @@ public class JavaAPISuite implements Serializable { } }; - final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(new VoidFunction<Integer>() { - @Override - public void call(Integer x) { - floatAccum.add((float) x); - } - }); + Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); + rdd.foreach(x -> floatAccum.add((float) x)); assertEquals((Float) 25.0f, floatAccum.value()); // Test the setValue method @@ -1449,12 +1224,7 @@ public class JavaAPISuite implements Serializable { @Test public void keyBy() { JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); - List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() { - @Override - public String call(Integer t) { - return t.toString(); - } - }).collect(); + List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); assertEquals(new Tuple2<>("1", 1), s.get(0)); assertEquals(new Tuple2<>("2", 2), s.get(1)); } @@ -1487,26 +1257,10 @@ public class JavaAPISuite implements Serializable { @Test public void combineByKey() { JavaRDD<Integer> originalRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6)); - Function<Integer, Integer> keyFunction = new Function<Integer, Integer>() { - @Override - public Integer call(Integer v1) { - return v1 % 3; - } - }; - Function<Integer, Integer> createCombinerFunction = new Function<Integer, Integer>() { - @Override - public Integer call(Integer v1) { - return v1; - } - }; + Function<Integer, Integer> keyFunction = v1 -> v1 % 3; + Function<Integer, Integer> createCombinerFunction = v1 -> v1; - Function2<Integer, Integer, Integer> mergeValueFunction = - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer v1, Integer v2) { - return v1 + v2; - } - }; + Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2; JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction) .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction); @@ -1534,20 +1288,8 @@ public class JavaAPISuite implements Serializable { @Test public void mapOnPairRDD() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i, i % 2); - } - }); - JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair( - new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) { - return new Tuple2<>(in._2(), in._1()); - } - }); + JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); assertEquals(Arrays.asList( new Tuple2<>(1, 1), new Tuple2<>(0, 2), @@ -1561,13 +1303,7 @@ public class JavaAPISuite implements Serializable { public void collectPartitions() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i, i % 2); - } - }); + JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); List<Integer>[] parts = rdd1.collectPartitions(new int[] {0}); assertEquals(Arrays.asList(1, 2), parts[0]); @@ -1623,13 +1359,7 @@ public class JavaAPISuite implements Serializable { public void collectAsMapWithIntArrayValues() { // Regression test for SPARK-1040 JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); - JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair( - new PairFunction<Integer, Integer, int[]>() { - @Override - public Tuple2<Integer, int[]> call(Integer x) { - return new Tuple2<>(x, new int[]{x}); - } - }); + JavaPairRDD<Integer, int[]> pairRDD = rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); pairRDD.collect(); // Works fine pairRDD.collectAsMap(); // Used to crash with ClassCastException } @@ -1651,13 +1381,7 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") public void sampleByKey() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i % 2, 1); - } - }); + JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1)); Map<Integer, Double> fractions = new HashMap<>(); fractions.put(0, 0.5); fractions.put(1, 1.0); @@ -1677,13 +1401,7 @@ public class JavaAPISuite implements Serializable { @SuppressWarnings("unchecked") public void sampleByKeyExact() { JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair( - new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) { - return new Tuple2<>(i % 2, 1); - } - }); + JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i % 2, 1)); Map<Integer, Double> fractions = new HashMap<>(); fractions.put(0, 0.5); fractions.put(1, 1.0); @@ -1754,14 +1472,7 @@ public class JavaAPISuite implements Serializable { public void foreachAsync() throws Exception { List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<Void> future = rdd.foreachAsync( - new VoidFunction<Integer>() { - @Override - public void call(Integer integer) { - // intentionally left blank. - } - } - ); + JavaFutureAction<Void> future = rdd.foreachAsync(integer -> {}); future.get(); assertFalse(future.isCancelled()); assertTrue(future.isDone()); @@ -1784,11 +1495,8 @@ public class JavaAPISuite implements Serializable { public void testAsyncActionCancellation() throws Exception { List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<Void> future = rdd.foreachAsync(new VoidFunction<Integer>() { - @Override - public void call(Integer integer) throws InterruptedException { - Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled. - } + JavaFutureAction<Void> future = rdd.foreachAsync(integer -> { + Thread.sleep(10000); // To ensure that the job won't finish before it's cancelled. }); future.cancel(true); assertTrue(future.isCancelled()); @@ -1805,7 +1513,7 @@ public class JavaAPISuite implements Serializable { public void testAsyncActionErrorWrapping() throws Exception { List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> rdd = sc.parallelize(data, 1); - JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<Integer>()).countAsync(); + JavaFutureAction<Long> future = rdd.map(new BuggyMapFunction<>()).countAsync(); try { future.get(2, TimeUnit.SECONDS); fail("Expected future.get() for failed job to throw ExcecutionException");
http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java index ba57b6b..938cc8d 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java @@ -59,39 +59,39 @@ public class JavaConsumerStrategySuite implements Serializable { ); final ConsumerStrategy<String, String> sub1 = - ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams, sOffsets); + ConsumerStrategies.Subscribe(sTopics, sKafkaParams, sOffsets); final ConsumerStrategy<String, String> sub2 = - ConsumerStrategies.<String, String>Subscribe(sTopics, sKafkaParams); + ConsumerStrategies.Subscribe(sTopics, sKafkaParams); final ConsumerStrategy<String, String> sub3 = - ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams, offsets); + ConsumerStrategies.Subscribe(topics, kafkaParams, offsets); final ConsumerStrategy<String, String> sub4 = - ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams); + ConsumerStrategies.Subscribe(topics, kafkaParams); Assert.assertEquals( sub1.executorKafkaParams().get("bootstrap.servers"), sub3.executorKafkaParams().get("bootstrap.servers")); final ConsumerStrategy<String, String> psub1 = - ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams, sOffsets); + ConsumerStrategies.SubscribePattern(pat, sKafkaParams, sOffsets); final ConsumerStrategy<String, String> psub2 = - ConsumerStrategies.<String, String>SubscribePattern(pat, sKafkaParams); + ConsumerStrategies.SubscribePattern(pat, sKafkaParams); final ConsumerStrategy<String, String> psub3 = - ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams, offsets); + ConsumerStrategies.SubscribePattern(pat, kafkaParams, offsets); final ConsumerStrategy<String, String> psub4 = - ConsumerStrategies.<String, String>SubscribePattern(pat, kafkaParams); + ConsumerStrategies.SubscribePattern(pat, kafkaParams); Assert.assertEquals( psub1.executorKafkaParams().get("bootstrap.servers"), psub3.executorKafkaParams().get("bootstrap.servers")); final ConsumerStrategy<String, String> asn1 = - ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams, sOffsets); + ConsumerStrategies.Assign(sParts, sKafkaParams, sOffsets); final ConsumerStrategy<String, String> asn2 = - ConsumerStrategies.<String, String>Assign(sParts, sKafkaParams); + ConsumerStrategies.Assign(sParts, sKafkaParams); final ConsumerStrategy<String, String> asn3 = - ConsumerStrategies.<String, String>Assign(parts, kafkaParams, offsets); + ConsumerStrategies.Assign(parts, kafkaParams, offsets); final ConsumerStrategy<String, String> asn4 = - ConsumerStrategies.<String, String>Assign(parts, kafkaParams); + ConsumerStrategies.Assign(parts, kafkaParams); Assert.assertEquals( asn1.executorKafkaParams().get("bootstrap.servers"), http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java ---------------------------------------------------------------------- diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index d569b66..2e050f8 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -217,7 +217,7 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { String deployMode = isDriver ? "client" : "cluster"; SparkSubmitCommandBuilder launcher = - newCommandBuilder(Collections.<String>emptyList()); + newCommandBuilder(Collections.emptyList()); launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home")); launcher.master = "yarn"; http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java ---------------------------------------------------------------------- diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java index 3bc35da..9ff7ace 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitOptionParserSuite.java @@ -44,7 +44,7 @@ public class SparkSubmitOptionParserSuite extends BaseSuite { count++; verify(parser).handle(eq(optNames[0]), eq(value)); verify(parser, times(count)).handle(anyString(), anyString()); - verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList())); + verify(parser, times(count)).handleExtraArgs(eq(Collections.emptyList())); } } @@ -54,9 +54,9 @@ public class SparkSubmitOptionParserSuite extends BaseSuite { parser.parse(Arrays.asList(name)); count++; switchCount++; - verify(parser, times(switchCount)).handle(eq(switchNames[0]), same((String) null)); + verify(parser, times(switchCount)).handle(eq(switchNames[0]), same(null)); verify(parser, times(count)).handle(anyString(), any(String.class)); - verify(parser, times(count)).handleExtraArgs(eq(Collections.<String>emptyList())); + verify(parser, times(count)).handleExtraArgs(eq(Collections.emptyList())); } } } @@ -80,7 +80,7 @@ public class SparkSubmitOptionParserSuite extends BaseSuite { List<String> args = Arrays.asList(parser.MASTER + "=" + parser.MASTER); parser.parse(args); verify(parser).handle(eq(parser.MASTER), eq(parser.MASTER)); - verify(parser).handleExtraArgs(eq(Collections.<String>emptyList())); + verify(parser).handleExtraArgs(eq(Collections.emptyList())); } private static class DummyParser extends SparkSubmitOptionParser { http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java index 8c0338e..683ceff 100644 --- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java @@ -21,16 +21,14 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; -import scala.Tuple2; - import org.junit.Assert; import org.junit.Test; import org.apache.spark.SharedSparkSession; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.ml.linalg.Vector; import org.apache.spark.ml.linalg.Vectors; +import org.apache.spark.mllib.linalg.DenseVector; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.distributed.RowMatrix; import org.apache.spark.sql.Dataset; @@ -69,35 +67,22 @@ public class JavaPCASuite extends SharedSparkSession { JavaRDD<Vector> dataRDD = jsc.parallelize(points, 2); RowMatrix mat = new RowMatrix(dataRDD.map( - new Function<Vector, org.apache.spark.mllib.linalg.Vector>() { - public org.apache.spark.mllib.linalg.Vector call(Vector vector) { - return new org.apache.spark.mllib.linalg.DenseVector(vector.toArray()); - } - } + (Vector vector) -> (org.apache.spark.mllib.linalg.Vector) new DenseVector(vector.toArray()) ).rdd()); Matrix pc = mat.computePrincipalComponents(3); mat.multiply(pc).rows().toJavaRDD(); - JavaRDD<Vector> expected = mat.multiply(pc).rows().toJavaRDD().map( - new Function<org.apache.spark.mllib.linalg.Vector, Vector>() { - public Vector call(org.apache.spark.mllib.linalg.Vector vector) { - return vector.asML(); - } - } - ); + JavaRDD<Vector> expected = mat.multiply(pc).rows().toJavaRDD() + .map(org.apache.spark.mllib.linalg.Vector::asML); - JavaRDD<VectorPair> featuresExpected = dataRDD.zip(expected).map( - new Function<Tuple2<Vector, Vector>, VectorPair>() { - public VectorPair call(Tuple2<Vector, Vector> pair) { - VectorPair featuresExpected = new VectorPair(); - featuresExpected.setFeatures(pair._1()); - featuresExpected.setExpected(pair._2()); - return featuresExpected; - } - } - ); + JavaRDD<VectorPair> featuresExpected = dataRDD.zip(expected).map(pair -> { + VectorPair featuresExpected1 = new VectorPair(); + featuresExpected1.setFeatures(pair._1()); + featuresExpected1.setExpected(pair._2()); + return featuresExpected1; + }); Dataset<Row> df = spark.createDataFrame(featuresExpected, VectorPair.class); PCAModel pca = new PCA() http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java index 6ded42e..65db3d0 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java @@ -25,7 +25,6 @@ import org.junit.Test; import org.apache.spark.SharedSparkSession; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.regression.LabeledPoint; @@ -42,7 +41,7 @@ public class JavaNaiveBayesSuite extends SharedSparkSession { new LabeledPoint(2, Vectors.dense(0.0, 0.0, 2.0)) ); - private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) { + private static int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) { int correct = 0; for (LabeledPoint p : points) { if (model.predict(p.features()) == p.label()) { @@ -80,12 +79,7 @@ public class JavaNaiveBayesSuite extends SharedSparkSession { public void testPredictJavaRDD() { JavaRDD<LabeledPoint> examples = jsc.parallelize(POINTS, 2).cache(); NaiveBayesModel model = NaiveBayes.train(examples.rdd()); - JavaRDD<Vector> vectors = examples.map(new Function<LabeledPoint, Vector>() { - @Override - public Vector call(LabeledPoint v) throws Exception { - return v.features(); - } - }); + JavaRDD<Vector> vectors = examples.map(LabeledPoint::features); JavaRDD<Double> predictions = model.predict(vectors); // Should be able to get the first prediction. predictions.first(); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java index 3d62b27..b4196c6 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaBisectingKMeansSuite.java @@ -17,7 +17,7 @@ package org.apache.spark.mllib.clustering; -import com.google.common.collect.Lists; +import java.util.Arrays; import org.junit.Assert; import org.junit.Test; @@ -31,7 +31,7 @@ public class JavaBisectingKMeansSuite extends SharedSparkSession { @Test public void twoDimensionalData() { - JavaRDD<Vector> points = jsc.parallelize(Lists.newArrayList( + JavaRDD<Vector> points = jsc.parallelize(Arrays.asList( Vectors.dense(4, -1), Vectors.dense(4, 1), Vectors.sparse(2, new int[]{0}, new double[]{1.0}) http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java index 08d6713..38ee250 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java @@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import scala.Tuple2; import scala.Tuple3; @@ -30,7 +31,6 @@ import static org.junit.Assert.*; import org.apache.spark.SharedSparkSession; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Matrix; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; @@ -39,7 +39,7 @@ public class JavaLDASuite extends SharedSparkSession { @Override public void setUp() throws IOException { super.setUp(); - ArrayList<Tuple2<Long, Vector>> tinyCorpus = new ArrayList<>(); + List<Tuple2<Long, Vector>> tinyCorpus = new ArrayList<>(); for (int i = 0; i < LDASuite.tinyCorpus().length; i++) { tinyCorpus.add(new Tuple2<>((Long) LDASuite.tinyCorpus()[i]._1(), LDASuite.tinyCorpus()[i]._2())); @@ -53,7 +53,7 @@ public class JavaLDASuite extends SharedSparkSession { Matrix topics = LDASuite.tinyTopics(); double[] topicConcentration = new double[topics.numRows()]; Arrays.fill(topicConcentration, 1.0D / topics.numRows()); - LocalLDAModel model = new LocalLDAModel(topics, Vectors.dense(topicConcentration), 1D, 100D); + LocalLDAModel model = new LocalLDAModel(topics, Vectors.dense(topicConcentration), 1.0, 100.0); // Check: basic parameters assertEquals(model.k(), tinyK); @@ -87,17 +87,17 @@ public class JavaLDASuite extends SharedSparkSession { // Check: basic parameters LocalLDAModel localModel = model.toLocal(); - assertEquals(model.k(), k); - assertEquals(localModel.k(), k); - assertEquals(model.vocabSize(), tinyVocabSize); - assertEquals(localModel.vocabSize(), tinyVocabSize); - assertEquals(model.topicsMatrix(), localModel.topicsMatrix()); + assertEquals(k, model.k()); + assertEquals(k, localModel.k()); + assertEquals(tinyVocabSize, model.vocabSize()); + assertEquals(tinyVocabSize, localModel.vocabSize()); + assertEquals(localModel.topicsMatrix(), model.topicsMatrix()); // Check: topic summaries Tuple2<int[], double[]>[] roundedTopicSummary = model.describeTopics(); - assertEquals(roundedTopicSummary.length, k); + assertEquals(k, roundedTopicSummary.length); Tuple2<int[], double[]>[] roundedLocalTopicSummary = localModel.describeTopics(); - assertEquals(roundedLocalTopicSummary.length, k); + assertEquals(k, roundedLocalTopicSummary.length); // Check: log probabilities assertTrue(model.logLikelihood() < 0.0); @@ -107,12 +107,8 @@ public class JavaLDASuite extends SharedSparkSession { JavaPairRDD<Long, Vector> topicDistributions = model.javaTopicDistributions(); // SPARK-5562. since the topicDistribution returns the distribution of the non empty docs // over topics. Compare it against nonEmptyCorpus instead of corpus - JavaPairRDD<Long, Vector> nonEmptyCorpus = corpus.filter( - new Function<Tuple2<Long, Vector>, Boolean>() { - public Boolean call(Tuple2<Long, Vector> tuple2) { - return Vectors.norm(tuple2._2(), 1.0) != 0.0; - } - }); + JavaPairRDD<Long, Vector> nonEmptyCorpus = + corpus.filter(tuple2 -> Vectors.norm(tuple2._2(), 1.0) != 0.0); assertEquals(topicDistributions.count(), nonEmptyCorpus.count()); // Check: javaTopTopicsPerDocuments @@ -155,14 +151,14 @@ public class JavaLDASuite extends SharedSparkSession { LDAModel model = lda.run(corpus); // Check: basic parameters - assertEquals(model.k(), k); - assertEquals(model.vocabSize(), tinyVocabSize); + assertEquals(k, model.k()); + assertEquals(tinyVocabSize, model.vocabSize()); // Check: topic summaries Tuple2<int[], double[]>[] roundedTopicSummary = model.describeTopics(); - assertEquals(roundedTopicSummary.length, k); + assertEquals(k, roundedTopicSummary.length); Tuple2<int[], double[]>[] roundedLocalTopicSummary = model.describeTopics(); - assertEquals(roundedLocalTopicSummary.length, k); + assertEquals(k, roundedLocalTopicSummary.length); } @Test @@ -177,7 +173,7 @@ public class JavaLDASuite extends SharedSparkSession { double logPerplexity = toyModel.logPerplexity(pairedDocs); // check: logLikelihood. - ArrayList<Tuple2<Long, Vector>> docsSingleWord = new ArrayList<>(); + List<Tuple2<Long, Vector>> docsSingleWord = new ArrayList<>(); docsSingleWord.add(new Tuple2<>(0L, Vectors.dense(1.0, 0.0, 0.0))); JavaPairRDD<Long, Vector> single = JavaPairRDD.fromJavaRDD(jsc.parallelize(docsSingleWord)); double logLikelihood = toyModel.logLikelihood(single); @@ -190,6 +186,6 @@ public class JavaLDASuite extends SharedSparkSession { LDASuite.tinyTopicDescription(); private JavaPairRDD<Long, Vector> corpus; private LocalLDAModel toyModel = LDASuite.toyModel(); - private ArrayList<Tuple2<Long, Vector>> toyData = LDASuite.javaToyData(); + private List<Tuple2<Long, Vector>> toyData = LDASuite.javaToyData(); } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java index 3451e07..15de566 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/fpm/JavaAssociationRulesSuite.java @@ -31,9 +31,9 @@ public class JavaAssociationRulesSuite extends SharedSparkSession { @SuppressWarnings("unchecked") JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = jsc.parallelize(Arrays.asList( - new FreqItemset<String>(new String[]{"a"}, 15L), - new FreqItemset<String>(new String[]{"b"}, 35L), - new FreqItemset<String>(new String[]{"a", "b"}, 12L) + new FreqItemset<>(new String[]{"a"}, 15L), + new FreqItemset<>(new String[]{"b"}, 35L), + new FreqItemset<>(new String[]{"a", "b"}, 12L) )); JavaRDD<AssociationRules.Rule<String>> results = (new AssociationRules()).run(freqItemsets); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java index a46b132..86c723a 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLinearRegressionSuite.java @@ -24,13 +24,13 @@ import org.junit.Test; import org.apache.spark.SharedSparkSession; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.util.LinearDataGenerator; public class JavaLinearRegressionSuite extends SharedSparkSession { - int validatePrediction(List<LabeledPoint> validationData, LinearRegressionModel model) { + private static int validatePrediction( + List<LabeledPoint> validationData, LinearRegressionModel model) { int numAccurate = 0; for (LabeledPoint point : validationData) { Double prediction = model.predict(point.features()); @@ -87,12 +87,7 @@ public class JavaLinearRegressionSuite extends SharedSparkSession { LinearDataGenerator.generateLinearInputAsList(A, weights, nPoints, 42, 0.1), 2).cache(); LinearRegressionWithSGD linSGDImpl = new LinearRegressionWithSGD(); LinearRegressionModel model = linSGDImpl.run(testRDD.rdd()); - JavaRDD<Vector> vectors = testRDD.map(new Function<LabeledPoint, Vector>() { - @Override - public Vector call(LabeledPoint v) throws Exception { - return v.features(); - } - }); + JavaRDD<Vector> vectors = testRDD.map(LabeledPoint::features); JavaRDD<Double> predictions = model.predict(vectors); // Should be able to get the first prediction. predictions.first(); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java index 1dcbbca..0f71deb 100644 --- a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java +++ b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java @@ -25,8 +25,6 @@ import org.junit.Test; import org.apache.spark.SharedSparkSession; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.configuration.Algo; import org.apache.spark.mllib.tree.configuration.Strategy; @@ -35,7 +33,7 @@ import org.apache.spark.mllib.tree.model.DecisionTreeModel; public class JavaDecisionTreeSuite extends SharedSparkSession { - int validatePrediction(List<LabeledPoint> validationData, DecisionTreeModel model) { + private static int validatePrediction(List<LabeledPoint> validationData, DecisionTreeModel model) { int numCorrect = 0; for (LabeledPoint point : validationData) { Double prediction = model.predict(point.features()); @@ -63,7 +61,7 @@ public class JavaDecisionTreeSuite extends SharedSparkSession { DecisionTreeModel model = learner.run(rdd.rdd()); int numCorrect = validatePrediction(arr, model); - Assert.assertTrue(numCorrect == rdd.count()); + Assert.assertEquals(numCorrect, rdd.count()); } @Test @@ -82,15 +80,10 @@ public class JavaDecisionTreeSuite extends SharedSparkSession { DecisionTreeModel model = DecisionTree$.MODULE$.train(rdd.rdd(), strategy); // java compatibility test - JavaRDD<Double> predictions = model.predict(rdd.map(new Function<LabeledPoint, Vector>() { - @Override - public Vector call(LabeledPoint v1) { - return v1.features(); - } - })); + JavaRDD<Double> predictions = model.predict(rdd.map(LabeledPoint::features)); int numCorrect = validatePrediction(arr, model); - Assert.assertTrue(numCorrect == rdd.count()); + Assert.assertEquals(numCorrect, rdd.count()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 06cd9ea..bf87174 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -157,7 +157,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo // to the accumulator. So we can check if the row groups are filtered or not in test case. TaskContext taskContext = TaskContext$.MODULE$.get(); if (taskContext != null) { - Option<AccumulatorV2<?, ?>> accu = (Option<AccumulatorV2<?, ?>>) taskContext.taskMetrics() + Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics() .lookForAccumulatorByName("numRowGroups"); if (accu.isDefined()) { ((LongAccumulator)accu.get()).add((long)blocks.size()); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java index 8b8a403..6ffccee 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java @@ -35,27 +35,35 @@ public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase public void testTypedAggregationAverage() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(v -> (double)(v._2() * 2))); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList()); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)), + agged.collectAsList()); } @Test public void testTypedAggregationCount() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(v -> v)); - Assert.assertEquals(Arrays.asList(tuple2("a", 2L), tuple2("b", 1L)), agged.collectAsList()); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 2L), new Tuple2<>("b", 1L)), + agged.collectAsList()); } @Test public void testTypedAggregationSumDouble() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(v -> (double)v._2())); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList()); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 3.0)), + agged.collectAsList()); } @Test public void testTypedAggregationSumLong() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(v -> (long)v._2())); - Assert.assertEquals(Arrays.asList(tuple2("a", 3L), tuple2("b", 3L)), agged.collectAsList()); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 3L), new Tuple2<>("b", 3L)), + agged.collectAsList()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java index 573d0e3..bf8ff61 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java @@ -30,7 +30,6 @@ import org.junit.Test; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -95,12 +94,7 @@ public class JavaApplySchemaSuite implements Serializable { personList.add(person2); JavaRDD<Row> rowRDD = jsc.parallelize(personList).map( - new Function<Person, Row>() { - @Override - public Row call(Person person) throws Exception { - return RowFactory.create(person.getName(), person.getAge()); - } - }); + person -> RowFactory.create(person.getName(), person.getAge())); List<StructField> fields = new ArrayList<>(2); fields.add(DataTypes.createStructField("name", DataTypes.StringType, false)); @@ -131,12 +125,7 @@ public class JavaApplySchemaSuite implements Serializable { personList.add(person2); JavaRDD<Row> rowRDD = jsc.parallelize(personList).map( - new Function<Person, Row>() { - @Override - public Row call(Person person) { - return RowFactory.create(person.getName(), person.getAge()); - } - }); + person -> RowFactory.create(person.getName(), person.getAge())); List<StructField> fields = new ArrayList<>(2); fields.add(DataTypes.createStructField("", DataTypes.StringType, false)); @@ -146,12 +135,7 @@ public class JavaApplySchemaSuite implements Serializable { Dataset<Row> df = spark.createDataFrame(rowRDD, schema); df.createOrReplaceTempView("people"); List<String> actual = spark.sql("SELECT * FROM people").toJavaRDD() - .map(new Function<Row, String>() { - @Override - public String call(Row row) { - return row.getString(0) + "_" + row.get(1); - } - }).collect(); + .map(row -> row.getString(0) + "_" + row.get(1)).collect(); List<String> expected = new ArrayList<>(2); expected.add("Michael_29"); http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index c44fc3d..c3b94a4 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -189,7 +189,7 @@ public class JavaDataFrameSuite { for (int i = 0; i < d.length(); i++) { Assert.assertEquals(bean.getD().get(i), d.apply(i)); } - // Java.math.BigInteger is equavient to Spark Decimal(38,0) + // Java.math.BigInteger is equivalent to Spark Decimal(38,0) Assert.assertEquals(new BigDecimal(bean.getE()), first.getDecimal(4)); } @@ -231,13 +231,10 @@ public class JavaDataFrameSuite { Assert.assertEquals(0, schema2.fieldIndex("id")); } - private static final Comparator<Row> crosstabRowComparator = new Comparator<Row>() { - @Override - public int compare(Row row1, Row row2) { - String item1 = row1.getString(0); - String item2 = row2.getString(0); - return item1.compareTo(item2); - } + private static final Comparator<Row> crosstabRowComparator = (row1, row2) -> { + String item1 = row1.getString(0); + String item2 = row2.getString(0); + return item1.compareTo(item2); }; @Test @@ -249,7 +246,7 @@ public class JavaDataFrameSuite { Assert.assertEquals("1", columnNames[1]); Assert.assertEquals("2", columnNames[2]); List<Row> rows = crosstab.collectAsList(); - Collections.sort(rows, crosstabRowComparator); + rows.sort(crosstabRowComparator); Integer count = 1; for (Row row : rows) { Assert.assertEquals(row.get(0).toString(), count.toString()); @@ -284,7 +281,7 @@ public class JavaDataFrameSuite { @Test public void testSampleBy() { Dataset<Row> df = spark.range(0, 100, 1, 2).select(col("id").mod(3).as("key")); - Dataset<Row> sampled = df.stat().<Integer>sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L); + Dataset<Row> sampled = df.stat().sampleBy("key", ImmutableMap.of(0, 0.1, 1, 0.2), 0L); List<Row> actual = sampled.groupBy("key").count().orderBy("key").collectAsList(); Assert.assertEquals(0, actual.get(0).getLong(0)); Assert.assertTrue(0 <= actual.get(0).getLong(1) && actual.get(0).getLong(1) <= 8); @@ -296,7 +293,7 @@ public class JavaDataFrameSuite { public void pivot() { Dataset<Row> df = spark.table("courseSales"); List<Row> actual = df.groupBy("year") - .pivot("course", Arrays.<Object>asList("dotNET", "Java")) + .pivot("course", Arrays.asList("dotNET", "Java")) .agg(sum("earnings")).orderBy("year").collectAsList(); Assert.assertEquals(2012, actual.get(0).getInt(0)); @@ -352,24 +349,24 @@ public class JavaDataFrameSuite { Dataset<Long> df = spark.range(1000); CountMinSketch sketch1 = df.stat().countMinSketch("id", 10, 20, 42); - Assert.assertEquals(sketch1.totalCount(), 1000); - Assert.assertEquals(sketch1.depth(), 10); - Assert.assertEquals(sketch1.width(), 20); + Assert.assertEquals(1000, sketch1.totalCount()); + Assert.assertEquals(10, sketch1.depth()); + Assert.assertEquals(20, sketch1.width()); CountMinSketch sketch2 = df.stat().countMinSketch(col("id"), 10, 20, 42); - Assert.assertEquals(sketch2.totalCount(), 1000); - Assert.assertEquals(sketch2.depth(), 10); - Assert.assertEquals(sketch2.width(), 20); + Assert.assertEquals(1000, sketch2.totalCount()); + Assert.assertEquals(10, sketch2.depth()); + Assert.assertEquals(20, sketch2.width()); CountMinSketch sketch3 = df.stat().countMinSketch("id", 0.001, 0.99, 42); - Assert.assertEquals(sketch3.totalCount(), 1000); - Assert.assertEquals(sketch3.relativeError(), 0.001, 1e-4); - Assert.assertEquals(sketch3.confidence(), 0.99, 5e-3); + Assert.assertEquals(1000, sketch3.totalCount()); + Assert.assertEquals(0.001, sketch3.relativeError(), 1.0e-4); + Assert.assertEquals(0.99, sketch3.confidence(), 5.0e-3); CountMinSketch sketch4 = df.stat().countMinSketch(col("id"), 0.001, 0.99, 42); - Assert.assertEquals(sketch4.totalCount(), 1000); - Assert.assertEquals(sketch4.relativeError(), 0.001, 1e-4); - Assert.assertEquals(sketch4.confidence(), 0.99, 5e-3); + Assert.assertEquals(1000, sketch4.totalCount()); + Assert.assertEquals(0.001, sketch4.relativeError(), 1.0e-4); + Assert.assertEquals(0.99, sketch4.confidence(), 5.0e-3); } @Test @@ -389,13 +386,13 @@ public class JavaDataFrameSuite { } BloomFilter filter3 = df.stat().bloomFilter("id", 1000, 64 * 5); - Assert.assertTrue(filter3.bitSize() == 64 * 5); + Assert.assertEquals(64 * 5, filter3.bitSize()); for (int i = 0; i < 1000; i++) { Assert.assertTrue(filter3.mightContain(i)); } BloomFilter filter4 = df.stat().bloomFilter(col("id").multiply(3), 1000, 64 * 5); - Assert.assertTrue(filter4.bitSize() == 64 * 5); + Assert.assertEquals(64 * 5, filter4.bitSize()); for (int i = 0; i < 1000; i++) { Assert.assertTrue(filter4.mightContain(i * 3)); } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java index fe86371..d3769a7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java @@ -24,7 +24,6 @@ import scala.Tuple2; import org.junit.Assert; import org.junit.Test; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; @@ -41,7 +40,9 @@ public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); Dataset<Tuple2<String, Integer>> agged = grouped.agg(new IntSumOf().toColumn()); - Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 3), new Tuple2<>("b", 3)), + agged.collectAsList()); Dataset<Tuple2<String, Integer>> agged2 = grouped.agg(new IntSumOf().toColumn()) .as(Encoders.tuple(Encoders.STRING(), Encoders.INT())); @@ -87,48 +88,36 @@ public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { @Test public void testTypedAggregationAverage() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg( - new MapFunction<Tuple2<String, Integer>, Double>() { - public Double call(Tuple2<String, Integer> value) throws Exception { - return (double)(value._2() * 2); - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList()); + Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(value -> (double)(value._2() * 2))); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)), + agged.collectAsList()); } @Test public void testTypedAggregationCount() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count( - new MapFunction<Tuple2<String, Integer>, Object>() { - public Object call(Tuple2<String, Integer> value) throws Exception { - return value; - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 2), tuple2("b", 1)), agged.collectAsList()); + Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(value -> value)); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 2L), new Tuple2<>("b", 1L)), + agged.collectAsList()); } @Test public void testTypedAggregationSumDouble() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum( - new MapFunction<Tuple2<String, Integer>, Double>() { - public Double call(Tuple2<String, Integer> value) throws Exception { - return (double)value._2(); - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList()); + Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(value -> (double) value._2())); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 3.0)), + agged.collectAsList()); } @Test public void testTypedAggregationSumLong() { KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong( - new MapFunction<Tuple2<String, Integer>, Long>() { - public Long call(Tuple2<String, Integer> value) throws Exception { - return (long)value._2(); - } - })); - Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); + Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(value -> (long) value._2())); + Assert.assertEquals( + Arrays.asList(new Tuple2<>("a", 3L), new Tuple2<>("b", 3L)), + agged.collectAsList()); } } http://git-wip-us.apache.org/repos/asf/spark/blob/1487c9af/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java index 8fc4eff..e62db7d 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuiteBase.java @@ -52,23 +52,13 @@ public class JavaDatasetAggregatorSuiteBase implements Serializable { spark = null; } - protected <T1, T2> Tuple2<T1, T2> tuple2(T1 t1, T2 t2) { - return new Tuple2<>(t1, t2); - } - protected KeyValueGroupedDataset<String, Tuple2<String, Integer>> generateGroupedDataset() { Encoder<Tuple2<String, Integer>> encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT()); List<Tuple2<String, Integer>> data = - Arrays.asList(tuple2("a", 1), tuple2("a", 2), tuple2("b", 3)); + Arrays.asList(new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("b", 3)); Dataset<Tuple2<String, Integer>> ds = spark.createDataset(data, encoder); - return ds.groupByKey( - new MapFunction<Tuple2<String, Integer>, String>() { - @Override - public String call(Tuple2<String, Integer> value) throws Exception { - return value._1(); - } - }, + return ds.groupByKey((MapFunction<Tuple2<String, Integer>, String>) value -> value._1(), Encoders.STRING()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
