[SPARK-15633][MINOR] Make package name for Java tests consistent ## What changes were proposed in this pull request? This is a simple patch that makes package names for Java 8 test suites consistent. I moved everything to test.org.apache.spark to we can test package private APIs properly. Also added "java8" as the package name so we can easily run all the tests related to Java 8.
## How was this patch tested? This is a test only change. Author: Reynold Xin <[email protected]> Closes #13364 from rxin/SPARK-15633. (cherry picked from commit 73178c75565e20f53e6ee1478f3d976732c64438) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ada31984 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ada31984 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ada31984 Branch: refs/heads/branch-2.0 Commit: ada319844a42b0b76e92d62faec258bfd0bb10ac Parents: 3801fb4 Author: Reynold Xin <[email protected]> Authored: Fri May 27 21:20:02 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri May 27 21:20:08 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkFunSuite.scala | 2 +- .../java/org/apache/spark/Java8APISuite.java | 393 -------- .../spark/sql/Java8DatasetAggregatorSuite.java | 61 -- .../apache/spark/streaming/Java8APISuite.java | 909 ------------------ .../apache/spark/java8/Java8RDDAPISuite.java | 395 ++++++++ .../spark/java8/dstream/Java8APISuite.java | 910 +++++++++++++++++++ .../java8/sql/Java8DatasetAggregatorSuite.java | 62 ++ .../scala/org/apache/spark/JDK8ScalaSuite.scala | 27 - .../org/apache/spark/java8/JDK8ScalaSuite.scala | 30 + .../spark/sql/JavaDatasetAggregatorSuite.java | 134 +++ .../sql/JavaDatasetAggregatorSuiteBase.java | 75 ++ .../org/apache/spark/sql/JavaSaveLoadSuite.java | 106 +++ .../sql/sources/JavaDatasetAggregatorSuite.java | 134 --- .../sources/JavaDatasetAggregatorSuiteBase.java | 75 -- .../spark/sql/sources/JavaSaveLoadSuite.java | 106 --- 15 files changed, 1713 insertions(+), 1706 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ada31984/core/src/test/scala/org/apache/spark/SparkFunSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 0081bca..cd87680 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.AccumulatorContext /** * Base abstract class for all unit tests in Spark for handling common functionality. */ -private[spark] abstract class SparkFunSuite +abstract class SparkFunSuite extends FunSuite with BeforeAndAfterAll with Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/ada31984/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java deleted file mode 100644 index 6ac5ca9..0000000 --- a/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import java.io.File; -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Iterables; -import com.google.common.io.Files; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.util.Utils; - -/** - * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8 - * lambda syntax. - */ -public class Java8APISuite implements Serializable { - static int foreachCalls = 0; - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaAPISuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - } - - @Test - public void foreachWithAnonymousClass() { - foreachCalls = 0; - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction<String>() { - @Override - public void call(String s) { - foreachCalls++; - } - }); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void foreach() { - foreachCalls = 0; - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(x -> foreachCalls++); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void groupBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Integer, Boolean> isOdd = x -> x % 2 == 0; - JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - - oddsAndEvens = rdd.groupBy(isOdd, 1); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - } - - @Test - public void leftOuterJoin() { - JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(1, 2), - new Tuple2<>(2, 1), - new Tuple2<>(3, 1) - )); - JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 'x'), - new Tuple2<>(2, 'y'), - new Tuple2<>(2, 'z'), - new Tuple2<>(4, 'w') - )); - List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = - rdd1.leftOuterJoin(rdd2).collect(); - Assert.assertEquals(5, joined.size()); - Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); - Assert.assertEquals(3, firstUnmatched._1().intValue()); - } - - @Test - public void foldReduce() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2<Integer, Integer, Integer> add = (a, b) -> a + b; - - int sum = rdd.fold(0, add); - Assert.assertEquals(33, sum); - - sum = rdd.reduce(add); - Assert.assertEquals(33, sum); - } - - @Test - public void foldByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); - Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); - Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); - Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); - } - - @Test - public void reduceByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); - Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); - Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); - Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); - - Map<Integer, Integer> localCounts = counts.collectAsMap(); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - - localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - } - - @Test - public void map() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); - doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) - .cache(); - pairs.collect(); - JavaRDD<String> strings = rdd.map(Object::toString).cache(); - strings.collect(); - } - - @Test - public void flatMap() { - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", - "The quick brown fox jumps over the lazy dog.")); - JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); - - Assert.assertEquals("Hello", words.first()); - Assert.assertEquals(11, words.count()); - - JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { - List<Tuple2<String, String>> pairs2 = new LinkedList<>(); - for (String word : s.split(" ")) { - pairs2.add(new Tuple2<>(word, word)); - } - return pairs2.iterator(); - }); - - Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); - Assert.assertEquals(11, pairs.count()); - - JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { - List<Double> lengths = new LinkedList<>(); - for (String word : s.split(" ")) { - lengths.add((double) word.length()); - } - return lengths.iterator(); - }); - - Assert.assertEquals(5.0, doubles.first(), 0.01); - Assert.assertEquals(11, pairs.count()); - } - - @Test - public void mapsFromPairsToPairs() { - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - - // Regression test for SPARK-668: - JavaPairRDD<String, Integer> swapped = - pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator()); - swapped.collect(); - - // There was never a bug here, but it's worth testing: - pairRDD.map(Tuple2::swap).collect(); - } - - @Test - public void mapPartitions() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum).iterator(); - }); - - Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); - } - - @Test - public void sequenceFile() { - File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - // Try reading the output back as an object file - JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) - .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); - Assert.assertEquals(pairs, readRDD.collect()); - Utils.deleteRecursively(tempDir); - } - - @Test - public void zip() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x); - JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); - zipped.count(); - } - - @Test - public void zipPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); - JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); - FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = - (Iterator<Integer> i, Iterator<String> s) -> { - int sizeI = 0; - while (i.hasNext()) { - sizeI += 1; - i.next(); - } - int sizeS = 0; - while (s.hasNext()) { - sizeS += 1; - s.next(); - } - return Arrays.asList(sizeI, sizeS).iterator(); - }; - JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); - Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); - } - - @Test - public void accumulators() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(intAccum::add); - Assert.assertEquals((Integer) 25, intAccum.value()); - - Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(x -> doubleAccum.add((double) x)); - Assert.assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { - @Override - public Float addInPlace(Float r, Float t) { - return r + t; - } - @Override - public Float addAccumulator(Float r, Float t) { - return r + t; - } - @Override - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(x -> floatAccum.add((float) x)); - Assert.assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - Assert.assertEquals((Float) 5.0f, floatAccum.value()); - } - - @Test - public void keyBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); - List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); - Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); - } - - @Test - public void mapOnPairRDD() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - JavaPairRDD<Integer, Integer> rdd3 = - rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); - Assert.assertEquals(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(0, 2), - new Tuple2<>(1, 3), - new Tuple2<>(0, 4)), rdd3.collect()); - } - - @Test - public void collectPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - - JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - List<Integer>[] parts = rdd1.collectPartitions(new int[]{0}); - Assert.assertEquals(Arrays.asList(1, 2), parts[0]); - - parts = rdd1.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(3, 4), parts[0]); - Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - - Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), - rdd2.collectPartitions(new int[]{0})[0]); - - List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), - parts2[1]); - } - - @Test - public void collectAsMapWithIntArrayValues() { - // Regression test for SPARK-1040 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); - JavaPairRDD<Integer, int[]> pairRDD = - rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); - pairRDD.collect(); // Works fine - pairRDD.collectAsMap(); // Used to crash with ClassCastException - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ada31984/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java b/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java deleted file mode 100644 index 1a2aea6..0000000 --- a/external/java8-tests/src/test/java/org/apache/spark/sql/Java8DatasetAggregatorSuite.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package test.org.apache.spark.sql.sources; - -import java.util.Arrays; - -import org.junit.Assert; -import org.junit.Test; -import scala.Tuple2; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.KeyValueGroupedDataset; -import org.apache.spark.sql.expressions.javalang.typed; - -/** - * Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax. - */ -public class Java8DatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase { - @Test - public void testTypedAggregationAverage() { - KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(v -> (double)(v._2() * 2))); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 6.0)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationCount() { - KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.count(v -> v)); - Assert.assertEquals(Arrays.asList(tuple2("a", 2), tuple2("b", 1)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationSumDouble() { - KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.sum(v -> (double)v._2())); - Assert.assertEquals(Arrays.asList(tuple2("a", 3.0), tuple2("b", 3.0)), agged.collectAsList()); - } - - @Test - public void testTypedAggregationSumLong() { - KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset(); - Dataset<Tuple2<String, Long>> agged = grouped.agg(typed.sumLong(v -> (long)v._2())); - Assert.assertEquals(Arrays.asList(tuple2("a", 3), tuple2("b", 3)), agged.collectAsList()); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ada31984/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java deleted file mode 100644 index d0fed30..0000000 --- a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ /dev/null @@ -1,909 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.junit.Assert; -import org.junit.Test; - -import org.apache.spark.Accumulator; -import org.apache.spark.HashPartitioner; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; - -/** - * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 - * lambda syntax. - */ -@SuppressWarnings("unchecked") -public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { - - @Test - public void testMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("goodnight", "moon")); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(5, 5), - Arrays.asList(9, 4)); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(String::length); - JavaTestUtils.attachTestOutputStream(letterCount); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFilter() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("yankees")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> filtered = stream.filter(s -> s.contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testMapPartitions() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOX")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> mapped = stream.mapPartitions(in -> { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out).iterator(); - }); - JavaTestUtils.attachTestOutputStream(mapped); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduce() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(15), - Arrays.asList(24)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y); - JavaTestUtils.attachTestOutputStream(reduced); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByWindow() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(21), - Arrays.asList(39), - Arrays.asList(24)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reducedWindowed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); - - Assert.assertEquals(expected, result); - } - - @Test - public void testTransform() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(3, 4, 5), - Arrays.asList(6, 7, 8), - Arrays.asList(9, 10, 11)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2)); - - JavaTestUtils.attachTestOutputStream(transformed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testVariousTransform() { - // tests whether all variations of transform can be called from Java - - List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - - List<List<Tuple2<String, Integer>>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - - JavaDStream<Integer> transformed1 = stream.transform(in -> null); - JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null); - JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null); - JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null); - JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null); - JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null); - JavaPairDStream<String, String> pairTransformed4 = - pairStream.transformToPair((x, time) -> null); - - } - - @Test - public void testTransformWith() { - List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList( - new Tuple2<>("california", "sharks"), - new Tuple2<>("new york", "rangers"))); - - List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "mets")), - Arrays.asList( - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "islanders"))); - - - List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("dodgers", "giants")), - new Tuple2<>("new york", - new Tuple2<>("yankees", "mets"))), - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("sharks", "ducks")), - new Tuple2<>("new york", - new Tuple2<>("rangers", "islanders")))); - - JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream<String, Tuple2<String, String>> joined = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y)); - - JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); - for (List<Tuple2<String, Tuple2<String, String>>> res : result) { - unorderedResult.add(Sets.newHashSet(res)); - } - - Assert.assertEquals(expected, unorderedResult); - } - - - @Test - public void testVariousTransformWith() { - // tests whether all variations of transformWith can be called from Java - - List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); - List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); - JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); - JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); - - List<List<Tuple2<String, Integer>>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - List<List<Tuple2<Double, Character>>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); - JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); - JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - - JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null); - JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream<Double, Double> transformed3 = - stream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream<Double, Double> transformed4 = - stream1.transformWithToPair(pairStream1,(x, y, z) -> null); - - JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null); - - JavaDStream<Double> pairTransformed2_ = - pairStream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream<Double, Double> pairTransformed3 = - pairStream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream<Double, Double> pairTransformed4 = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null); - } - - @Test - public void testStreamingContextTransform() { - List<List<Integer>> stream1input = Arrays.asList( - Arrays.asList(1), - Arrays.asList(2) - ); - - List<List<Integer>> stream2input = Arrays.asList( - Arrays.asList(3), - Arrays.asList(4) - ); - - List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<>(1, "x")), - Arrays.asList(new Tuple2<>(2, "y")) - ); - - List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), - Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) - ); - - JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); - JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); - JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - - List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); - - // This is just to test whether this transform to JavaStream compiles - JavaDStream<Long> transformed1 = ssc.transform( - listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - }); - - List<JavaDStream<?>> listOfDStreams2 = - Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); - - JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( - listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0); - JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1); - JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2); - JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction<Integer, Integer, Integer> mapToTuple = - (Integer i) -> new Tuple2<>(i, i); - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - }); - JavaTestUtils.attachTestOutputStream(transformed2); - List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = - JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("go", "giants"), - Arrays.asList("boo", "dodgers"), - Arrays.asList("athletics")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"), - Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"), - Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap( - s -> Lists.newArrayList(s.split("(?!^)")).iterator()); - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0); - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,1,1), - Arrays.asList(1,1,1)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - - stream.foreachRDD(rdd -> { - accumRdd.add(1); - rdd.foreach(x -> accumEle.add(1)); - }); - - // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD((rdd, time) -> { - return; - }); - - JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(2, accumRdd.value().intValue()); - Assert.assertEquals(6, accumEle.value().intValue()); - } - - @Test - public void testPairFlatMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("dodgers"), - Arrays.asList("athletics")); - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(6, "g"), - new Tuple2<>(6, "i"), - new Tuple2<>(6, "a"), - new Tuple2<>(6, "n"), - new Tuple2<>(6, "t"), - new Tuple2<>(6, "s")), - Arrays.asList( - new Tuple2<>(7, "d"), - new Tuple2<>(7, "o"), - new Tuple2<>(7, "d"), - new Tuple2<>(7, "g"), - new Tuple2<>(7, "e"), - new Tuple2<>(7, "r"), - new Tuple2<>(7, "s")), - Arrays.asList( - new Tuple2<>(9, "a"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "h"), - new Tuple2<>(9, "l"), - new Tuple2<>(9, "e"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "i"), - new Tuple2<>(9, "c"), - new Tuple2<>(9, "s"))); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter : s.split("(?!^)")) { - out.add(new Tuple2<>(s.length(), letter)); - } - return out.iterator(); - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - /* - * Performs an order-invariant comparison of lists representing two RDD streams. This allows - * us to account for ordering variation within individual RDD's which occurs during windowing. - */ - public static <T extends Comparable<T>> void assertOrderInvariantEquals( - List<List<T>> expected, List<List<T>> actual) { - expected.forEach(list -> Collections.sort(list)); - List<List<T>> sortedActual = new ArrayList<>(); - actual.forEach(list -> { - List<T> sortedList = new ArrayList<>(list); - Collections.sort(sortedList); - sortedActual.add(sortedList); - }); - Assert.assertEquals(expected, sortedActual); - } - - @Test - public void testPairFilter() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("giants", 6)), - Arrays.asList(new Tuple2<>("yankees", 7))); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = - stream.mapToPair(x -> new Tuple2<>(x, x.length())); - JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "yankees"), - new Tuple2<>("new york", "mets")), - Arrays.asList(new Tuple2<>("california", "sharks"), - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "rangers"), - new Tuple2<>("new york", "islanders"))); - - List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 1), - new Tuple2<>("california", 3), - new Tuple2<>("new york", 4), - new Tuple2<>("new york", 1)), - Arrays.asList( - new Tuple2<>("california", 5), - new Tuple2<>("california", 5), - new Tuple2<>("new york", 3), - new Tuple2<>("new york", 1))); - - @Test - public void testPairMap() { // Maps pair -> pair of different type - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap()); - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMapPartitions() { // Maps pair -> pair of different type - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { - LinkedList<Tuple2<Integer, String>> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2<String, Integer> next = in.next(); - out.add(next.swap()); - } - return out.iterator(); - }); - - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMap2() { // Maps pair -> single - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(1, 3, 4, 1), - Arrays.asList(5, 5, 3, 1)); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair - List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2)), - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2))); - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o")), - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { - List<Tuple2<Integer, String>> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out.iterator(); - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairReduceByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y); - - JavaTestUtils.attachTestOutputStream(reduced); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCombineByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i, - (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); - - JavaTestUtils.attachTestOutputStream(combined); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindow() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUpdateStateByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); - }); - - JavaTestUtils.attachTestOutputStream(updated); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindowWithInverse() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000), - new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairTransform() { - List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5)), - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5))); - - JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey()); - - JavaTestUtils.attachTestOutputStream(sorted); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToNormalRDDTransform() { - List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(3, 1, 4, 2), - Arrays.asList(2, 3, 4, 1)); - - JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1())); - JavaTestUtils.attachTestOutputStream(firstParts); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapValues() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; - - List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "DODGERS"), - new Tuple2<>("california", "GIANTS"), - new Tuple2<>("new york", "YANKEES"), - new Tuple2<>("new york", "METS")), - Arrays.asList(new Tuple2<>("california", "SHARKS"), - new Tuple2<>("california", "DUCKS"), - new Tuple2<>("new york", "RANGERS"), - new Tuple2<>("new york", "ISLANDERS"))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase); - JavaTestUtils.attachTestOutputStream(mapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMapValues() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; - - List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers1"), - new Tuple2<>("california", "dodgers2"), - new Tuple2<>("california", "giants1"), - new Tuple2<>("california", "giants2"), - new Tuple2<>("new york", "yankees1"), - new Tuple2<>("new york", "yankees2"), - new Tuple2<>("new york", "mets1"), - new Tuple2<>("new york", "mets2")), - Arrays.asList(new Tuple2<>("california", "sharks1"), - new Tuple2<>("california", "sharks2"), - new Tuple2<>("california", "ducks1"), - new Tuple2<>("california", "ducks2"), - new Tuple2<>("new york", "rangers1"), - new Tuple2<>("new york", "rangers2"), - new Tuple2<>("new york", "islanders1"), - new Tuple2<>("new york", "islanders2"))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> flatMapped = - pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2")); - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - /** - * This test is only for testing the APIs. It's not necessary to run it. - */ - public void testMapWithStateAPI() { - JavaPairRDD<String, Boolean> initialRDD = null; - JavaPairDStream<String, Integer> wordsDstream = null; - - JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); - - JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> { - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/ada31984/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java ---------------------------------------------------------------------- diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java new file mode 100644 index 0000000..8ee0e7e --- /dev/null +++ b/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.java8; + +import java.io.File; +import java.io.Serializable; +import java.util.*; + +import scala.Tuple2; + +import com.google.common.collect.Iterables; +import com.google.common.io.Files; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.api.java.JavaDoubleRDD; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.*; +import org.apache.spark.util.Utils; + +/** + * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8 + * lambda syntax. + */ +public class Java8RDDAPISuite implements Serializable { + private static int foreachCalls = 0; + private transient JavaSparkContext sc; + + @Before + public void setUp() { + sc = new JavaSparkContext("local", "JavaAPISuite"); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + } + + @Test + public void foreachWithAnonymousClass() { + foreachCalls = 0; + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); + rdd.foreach(new VoidFunction<String>() { + @Override + public void call(String s) { + foreachCalls++; + } + }); + Assert.assertEquals(2, foreachCalls); + } + + @Test + public void foreach() { + foreachCalls = 0; + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); + rdd.foreach(x -> foreachCalls++); + Assert.assertEquals(2, foreachCalls); + } + + @Test + public void groupBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function<Integer, Boolean> isOdd = x -> x % 2 == 0; + JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + + oddsAndEvens = rdd.groupBy(isOdd, 1); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + } + + @Test + public void leftOuterJoin() { + JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( + new Tuple2<>(1, 1), + new Tuple2<>(1, 2), + new Tuple2<>(2, 1), + new Tuple2<>(3, 1) + )); + JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( + new Tuple2<>(1, 'x'), + new Tuple2<>(2, 'y'), + new Tuple2<>(2, 'z'), + new Tuple2<>(4, 'w') + )); + List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = + rdd1.leftOuterJoin(rdd2).collect(); + Assert.assertEquals(5, joined.size()); + Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched = + rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); + Assert.assertEquals(3, firstUnmatched._1().intValue()); + } + + @Test + public void foldReduce() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function2<Integer, Integer, Integer> add = (a, b) -> a + b; + + int sum = rdd.fold(0, add); + Assert.assertEquals(33, sum); + + sum = rdd.reduce(add); + Assert.assertEquals(33, sum); + } + + @Test + public void foldByKey() { + List<Tuple2<Integer, Integer>> pairs = Arrays.asList( + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) + ); + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); + JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); + Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); + Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); + Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); + } + + @Test + public void reduceByKey() { + List<Tuple2<Integer, Integer>> pairs = Arrays.asList( + new Tuple2<>(2, 1), + new Tuple2<>(2, 1), + new Tuple2<>(1, 1), + new Tuple2<>(3, 2), + new Tuple2<>(3, 1) + ); + JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); + JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); + Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); + Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); + Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); + + Map<Integer, Integer> localCounts = counts.collectAsMap(); + Assert.assertEquals(1, localCounts.get(1).intValue()); + Assert.assertEquals(2, localCounts.get(2).intValue()); + Assert.assertEquals(3, localCounts.get(3).intValue()); + + localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); + Assert.assertEquals(1, localCounts.get(1).intValue()); + Assert.assertEquals(2, localCounts.get(2).intValue()); + Assert.assertEquals(3, localCounts.get(3).intValue()); + } + + @Test + public void map() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); + doubles.collect(); + JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) + .cache(); + pairs.collect(); + JavaRDD<String> strings = rdd.map(Object::toString).cache(); + strings.collect(); + } + + @Test + public void flatMap() { + JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", + "The quick brown fox jumps over the lazy dog.")); + JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); + + Assert.assertEquals("Hello", words.first()); + Assert.assertEquals(11, words.count()); + + JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { + List<Tuple2<String, String>> pairs2 = new LinkedList<>(); + for (String word : s.split(" ")) { + pairs2.add(new Tuple2<>(word, word)); + } + return pairs2.iterator(); + }); + + Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); + Assert.assertEquals(11, pairs.count()); + + JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { + List<Double> lengths = new LinkedList<>(); + for (String word : s.split(" ")) { + lengths.add((double) word.length()); + } + return lengths.iterator(); + }); + + Assert.assertEquals(5.0, doubles.first(), 0.01); + Assert.assertEquals(11, pairs.count()); + } + + @Test + public void mapsFromPairsToPairs() { + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") + ); + JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); + + // Regression test for SPARK-668: + JavaPairRDD<String, Integer> swapped = + pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator()); + swapped.collect(); + + // There was never a bug here, but it's worth testing: + pairRDD.map(Tuple2::swap).collect(); + } + + @Test + public void mapPartitions() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); + JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> { + int sum = 0; + while (iter.hasNext()) { + sum += iter.next(); + } + return Collections.singletonList(sum).iterator(); + }); + + Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); + } + + @Test + public void sequenceFile() { + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + String outputDir = new File(tempDir, "output").getAbsolutePath(); + List<Tuple2<Integer, String>> pairs = Arrays.asList( + new Tuple2<>(1, "a"), + new Tuple2<>(2, "aa"), + new Tuple2<>(3, "aaa") + ); + JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); + + rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) + .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); + + // Try reading the output back as an object file + JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) + .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); + Assert.assertEquals(pairs, readRDD.collect()); + Utils.deleteRecursively(tempDir); + } + + @Test + public void zip() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x); + JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); + zipped.count(); + } + + @Test + public void zipPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); + JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); + FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = + (Iterator<Integer> i, Iterator<String> s) -> { + int sizeI = 0; + while (i.hasNext()) { + sizeI += 1; + i.next(); + } + int sizeS = 0; + while (s.hasNext()) { + sizeS += 1; + s.next(); + } + return Arrays.asList(sizeI, sizeS).iterator(); + }; + JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); + Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); + } + + @Test + public void accumulators() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + + Accumulator<Integer> intAccum = sc.intAccumulator(10); + rdd.foreach(intAccum::add); + Assert.assertEquals((Integer) 25, intAccum.value()); + + Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); + rdd.foreach(x -> doubleAccum.add((double) x)); + Assert.assertEquals((Double) 25.0, doubleAccum.value()); + + // Try a custom accumulator type + AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { + @Override + public Float addInPlace(Float r, Float t) { + return r + t; + } + @Override + public Float addAccumulator(Float r, Float t) { + return r + t; + } + @Override + public Float zero(Float initialValue) { + return 0.0f; + } + }; + + Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); + rdd.foreach(x -> floatAccum.add((float) x)); + Assert.assertEquals((Float) 25.0f, floatAccum.value()); + + // Test the setValue method + floatAccum.setValue(5.0f); + Assert.assertEquals((Float) 5.0f, floatAccum.value()); + } + + @Test + public void keyBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); + List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); + Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); + } + + @Test + public void mapOnPairRDD() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); + JavaPairRDD<Integer, Integer> rdd2 = + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + JavaPairRDD<Integer, Integer> rdd3 = + rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); + Assert.assertEquals(Arrays.asList( + new Tuple2<>(1, 1), + new Tuple2<>(0, 2), + new Tuple2<>(1, 3), + new Tuple2<>(0, 4)), rdd3.collect()); + } + + @Test + public void collectPartitions() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); + + JavaPairRDD<Integer, Integer> rdd2 = + rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); + List<Integer>[] parts = rdd1.collectPartitions(new int[]{0}); + Assert.assertEquals(Arrays.asList(1, 2), parts[0]); + + parts = rdd1.collectPartitions(new int[]{1, 2}); + Assert.assertEquals(Arrays.asList(3, 4), parts[0]); + Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); + + Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), + rdd2.collectPartitions(new int[]{0})[0]); + + List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); + Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); + Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), + parts2[1]); + } + + @Test + public void collectAsMapWithIntArrayValues() { + // Regression test for SPARK-1040 + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); + JavaPairRDD<Integer, int[]> pairRDD = + rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); + pairRDD.collect(); // Works fine + pairRDD.collectAsMap(); // Used to crash with ClassCastException + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
