Repository: incubator-spark Updated Branches: refs/heads/master 1aa4f8af7 -> 29ac7ea52
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29ac7ea5/core/src/test/scala/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java deleted file mode 100644 index 20232e9..0000000 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ /dev/null @@ -1,981 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.base.Optional; -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.*; -import org.apache.spark.partial.BoundedDouble; -import org.apache.spark.partial.PartialResult; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.util.StatCounter; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaAPISuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - } - - static class ReverseIntComparator implements Comparator<Integer>, Serializable { - - @Override - public int compare(Integer a, Integer b) { - if (a > b) return -1; - else if (a < b) return 1; - else return 0; - } - }; - - @Test - public void sparkContextUnion() { - // Union of non-specialized JavaRDDs - List<String> strings = Arrays.asList("Hello", "World"); - JavaRDD<String> s1 = sc.parallelize(strings); - JavaRDD<String> s2 = sc.parallelize(strings); - // Varargs - JavaRDD<String> sUnion = sc.union(s1, s2); - Assert.assertEquals(4, sUnion.count()); - // List - List<JavaRDD<String>> list = new ArrayList<JavaRDD<String>>(); - list.add(s2); - sUnion = sc.union(s1, list); - Assert.assertEquals(4, sUnion.count()); - - // Union of JavaDoubleRDDs - List<Double> doubles = Arrays.asList(1.0, 2.0); - JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles); - JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles); - JavaDoubleRDD dUnion = sc.union(d1, d2); - Assert.assertEquals(4, dUnion.count()); - - // Union of JavaPairRDDs - List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); - pairs.add(new Tuple2<Integer, Integer>(1, 2)); - pairs.add(new Tuple2<Integer, Integer>(3, 4)); - JavaPairRDD<Integer, Integer> p1 = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> p2 = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> pUnion = sc.union(p1, p2); - Assert.assertEquals(4, pUnion.count()); - } - - @Test - public void sortByKey() { - List<Tuple2<Integer, Integer>> pairs = new ArrayList<Tuple2<Integer, Integer>>(); - pairs.add(new Tuple2<Integer, Integer>(0, 4)); - pairs.add(new Tuple2<Integer, Integer>(3, 2)); - pairs.add(new Tuple2<Integer, Integer>(-1, 1)); - - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - - // Default comparator - JavaPairRDD<Integer, Integer> sortedRDD = rdd.sortByKey(); - Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first()); - List<Tuple2<Integer, Integer>> sortedPairs = sortedRDD.collect(); - Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1)); - Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); - - // Custom comparator - sortedRDD = rdd.sortByKey(new ReverseIntComparator(), false); - Assert.assertEquals(new Tuple2<Integer, Integer>(-1, 1), sortedRDD.first()); - sortedPairs = sortedRDD.collect(); - Assert.assertEquals(new Tuple2<Integer, Integer>(0, 4), sortedPairs.get(1)); - Assert.assertEquals(new Tuple2<Integer, Integer>(3, 2), sortedPairs.get(2)); - } - - static int foreachCalls = 0; - - @Test - public void foreach() { - foreachCalls = 0; - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction<String>() { - @Override - public void call(String s) { - foreachCalls++; - } - }); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void lookup() { - JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, String>("Apples", "Fruit"), - new Tuple2<String, String>("Oranges", "Fruit"), - new Tuple2<String, String>("Oranges", "Citrus") - )); - Assert.assertEquals(2, categories.lookup("Oranges").size()); - Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size()); - } - - @Test - public void groupBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() { - @Override - public Boolean call(Integer x) { - return x % 2 == 0; - } - }; - JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds - - oddsAndEvens = rdd.groupBy(isOdd, 1); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens - Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds - } - - @Test - public void cogroup() { - JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, String>("Apples", "Fruit"), - new Tuple2<String, String>("Oranges", "Fruit"), - new Tuple2<String, String>("Oranges", "Citrus") - )); - JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( - new Tuple2<String, Integer>("Oranges", 2), - new Tuple2<String, Integer>("Apples", 3) - )); - JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices); - Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString()); - Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString()); - - cogrouped.collect(); - } - - @Test - public void leftOuterJoin() { - JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(1, 2), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(3, 1) - )); - JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<Integer, Character>(1, 'x'), - new Tuple2<Integer, Character>(2, 'y'), - new Tuple2<Integer, Character>(2, 'z'), - new Tuple2<Integer, Character>(4, 'w') - )); - List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined = - rdd1.leftOuterJoin(rdd2).collect(); - Assert.assertEquals(5, joined.size()); - Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter( - new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() { - @Override - public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup) - throws Exception { - return !tup._2()._2().isPresent(); - } - }).first(); - Assert.assertEquals(3, firstUnmatched._1().intValue()); - } - - @Test - public void foldReduce() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }; - - int sum = rdd.fold(0, add); - Assert.assertEquals(33, sum); - - sum = rdd.reduce(add); - Assert.assertEquals(33, sum); - } - - @Test - public void foldByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); - Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); - Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); - } - - @Test - public void reduceByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(2, 1), - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(3, 2), - new Tuple2<Integer, Integer>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); - Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); - Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); - - Map<Integer, Integer> localCounts = counts.collectAsMap(); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - - localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, - Integer>() { - @Override - public Integer call(Integer a, Integer b) { - return a + b; - } - }); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - } - - @Test - public void approximateResults() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Map<Integer, Long> countsByValue = rdd.countByValue(); - Assert.assertEquals(2, countsByValue.get(1).longValue()); - Assert.assertEquals(1, countsByValue.get(13).longValue()); - - PartialResult<Map<Integer, BoundedDouble>> approx = rdd.countByValueApprox(1); - Map<Integer, BoundedDouble> finalValue = approx.getFinalValue(); - Assert.assertEquals(2.0, finalValue.get(1).mean(), 0.01); - Assert.assertEquals(1.0, finalValue.get(13).mean(), 0.01); - } - - @Test - public void take() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Assert.assertEquals(1, rdd.first().intValue()); - List<Integer> firstTwo = rdd.take(2); - List<Integer> sample = rdd.takeSample(false, 2, 42); - } - - @Test - public void cartesian() { - JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); - JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("Hello", "World")); - JavaPairRDD<String, Double> cartesian = stringRDD.cartesian(doubleRDD); - Assert.assertEquals(new Tuple2<String, Double>("Hello", 1.0), cartesian.first()); - } - - @Test - public void javaDoubleRDD() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); - JavaDoubleRDD distinct = rdd.distinct(); - Assert.assertEquals(5, distinct.count()); - JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() { - @Override - public Boolean call(Double x) { - return x > 2.0; - } - }); - Assert.assertEquals(3, filter.count()); - JavaDoubleRDD union = rdd.union(rdd); - Assert.assertEquals(12, union.count()); - union = union.cache(); - Assert.assertEquals(12, union.count()); - - Assert.assertEquals(20, rdd.sum(), 0.01); - StatCounter stats = rdd.stats(); - Assert.assertEquals(20, stats.sum(), 0.01); - Assert.assertEquals(20/6.0, rdd.mean(), 0.01); - Assert.assertEquals(20/6.0, rdd.mean(), 0.01); - Assert.assertEquals(6.22222, rdd.variance(), 0.01); - Assert.assertEquals(7.46667, rdd.sampleVariance(), 0.01); - Assert.assertEquals(2.49444, rdd.stdev(), 0.01); - Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01); - - Double first = rdd.first(); - List<Double> take = rdd.take(5); - } - - @Test - public void javaDoubleRDDHistoGram() { - JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0)); - // Test using generated buckets - Tuple2<double[], long[]> results = rdd.histogram(2); - double[] expected_buckets = {1.0, 2.5, 4.0}; - long[] expected_counts = {2, 2}; - Assert.assertArrayEquals(expected_buckets, results._1, 0.1); - Assert.assertArrayEquals(expected_counts, results._2); - // Test with provided buckets - long[] histogram = rdd.histogram(expected_buckets); - Assert.assertArrayEquals(expected_counts, histogram); - } - - @Test - public void map() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() { - @Override - public Double call(Integer x) { - return 1.0 * x; - } - }).cache(); - doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer x) { - return new Tuple2<Integer, Integer>(x, x); - } - }).cache(); - pairs.collect(); - JavaRDD<String> strings = rdd.map(new Function<Integer, String>() { - @Override - public String call(Integer x) { - return x.toString(); - } - }).cache(); - strings.collect(); - } - - @Test - public void flatMap() { - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", - "The quick brown fox jumps over the lazy dog.")); - JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterable<String> call(String x) { - return Arrays.asList(x.split(" ")); - } - }); - Assert.assertEquals("Hello", words.first()); - Assert.assertEquals(11, words.count()); - - JavaPairRDD<String, String> pairs = rdd.flatMap( - new PairFlatMapFunction<String, String, String>() { - - @Override - public Iterable<Tuple2<String, String>> call(String s) { - List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>(); - for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word)); - return pairs; - } - } - ); - Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first()); - Assert.assertEquals(11, pairs.count()); - - JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() { - @Override - public Iterable<Double> call(String s) { - List<Double> lengths = new LinkedList<Double>(); - for (String word : s.split(" ")) lengths.add(word.length() * 1.0); - return lengths; - } - }); - Double x = doubles.first(); - Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01); - Assert.assertEquals(11, pairs.count()); - } - - @Test - public void mapsFromPairsToPairs() { - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - - // Regression test for SPARK-668: - JavaPairRDD<String, Integer> swapped = pairRDD.flatMap( - new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) throws Exception { - return Collections.singletonList(item.swap()); - } - }); - swapped.collect(); - - // There was never a bug here, but it's worth testing: - pairRDD.map(new PairFunction<Tuple2<Integer, String>, String, Integer>() { - @Override - public Tuple2<String, Integer> call(Tuple2<Integer, String> item) throws Exception { - return item.swap(); - } - }).collect(); - } - - @Test - public void mapPartitions() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitions( - new FlatMapFunction<Iterator<Integer>, Integer>() { - @Override - public Iterable<Integer> call(Iterator<Integer> iter) { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum); - } - }); - Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); - } - - @Test - public void repartition() { - // Shrinking number of partitions - JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 2); - JavaRDD<Integer> repartitioned1 = in1.repartition(4); - List<List<Integer>> result1 = repartitioned1.glom().collect(); - Assert.assertEquals(4, result1.size()); - for (List<Integer> l: result1) { - Assert.assertTrue(l.size() > 0); - } - - // Growing number of partitions - JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 4); - JavaRDD<Integer> repartitioned2 = in2.repartition(2); - List<List<Integer>> result2 = repartitioned2.glom().collect(); - Assert.assertEquals(2, result2.size()); - for (List<Integer> l: result2) { - Assert.assertTrue(l.size() > 0); - } - } - - @Test - public void persist() { - JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); - doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY()); - Assert.assertEquals(20, doubleRDD.sum(), 0.1); - - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - pairRDD = pairRDD.persist(StorageLevel.DISK_ONLY()); - Assert.assertEquals("a", pairRDD.first()._2()); - - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - rdd = rdd.persist(StorageLevel.DISK_ONLY()); - Assert.assertEquals(1, rdd.first().intValue()); - } - - @Test - public void iterator() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2); - TaskContext context = new TaskContext(0, 0, 0, false, false, null); - Assert.assertEquals(1, rdd.iterator(rdd.splits().get(0), context).next().intValue()); - } - - @Test - public void glom() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - Assert.assertEquals("[1, 2]", rdd.glom().first().toString()); - } - - // File input / output tests are largely adapted from FileSuite: - - @Test - public void textFiles() throws IOException { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - rdd.saveAsTextFile(outputDir); - // Read the plain text file and check it's OK - File outputFile = new File(outputDir, "part-00000"); - String content = Files.toString(outputFile, Charsets.UTF_8); - Assert.assertEquals("1\n2\n3\n4\n", content); - // Also try reading it in as a text file RDD - List<String> expected = Arrays.asList("1", "2", "3", "4"); - JavaRDD<String> readRDD = sc.textFile(outputDir); - Assert.assertEquals(expected, readRDD.collect()); - } - - @Test - public void textFilesCompressed() throws IOException { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - rdd.saveAsTextFile(outputDir, DefaultCodec.class); - - // Try reading it in as a text file RDD - List<String> expected = Arrays.asList("1", "2", "3", "4"); - JavaRDD<String> readRDD = sc.textFile(outputDir); - Assert.assertEquals(expected, readRDD.collect()); - } - - @Test - public void sequenceFile() { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - // Try reading the output back as an object file - JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, - Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() { - @Override - public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) { - return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()); - } - }); - Assert.assertEquals(pairs, readRDD.collect()); - } - - @Test - public void writeWithNewAPIHadoopFile() { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, - org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); - - JavaPairRDD<IntWritable, Text> output = sc.sequenceFile(outputDir, IntWritable.class, - Text.class); - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @Test - public void readWithNewAPIHadoopFile() throws IOException { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir, - org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, - Text.class, new Job().getConfiguration()); - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @Test - public void objectFilesOfInts() { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - rdd.saveAsObjectFile(outputDir); - // Try reading the output back as an object file - List<Integer> expected = Arrays.asList(1, 2, 3, 4); - JavaRDD<Integer> readRDD = sc.objectFile(outputDir); - Assert.assertEquals(expected, readRDD.collect()); - } - - @Test - public void objectFilesOfComplexTypes() { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - rdd.saveAsObjectFile(outputDir); - // Try reading the output back as an object file - JavaRDD<Tuple2<Integer, String>> readRDD = sc.objectFile(outputDir); - Assert.assertEquals(pairs, readRDD.collect()); - } - - @Test - public void hadoopFile() { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, - SequenceFileInputFormat.class, IntWritable.class, Text.class); - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @Test - public void hadoopFileCompressed() { - File tempDir = Files.createTempDir(); - String outputDir = new File(tempDir, "output_compressed").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<Integer, String>(1, "a"), - new Tuple2<Integer, String>(2, "aa"), - new Tuple2<Integer, String>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { - @Override - public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) { - return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); - } - }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, - DefaultCodec.class); - - JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir, - SequenceFileInputFormat.class, IntWritable.class, Text.class); - - Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, - String>() { - @Override - public String call(Tuple2<IntWritable, Text> x) { - return x.toString(); - } - }).collect().toString()); - } - - @Test - public void zip() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() { - @Override - public Double call(Integer x) { - return 1.0 * x; - } - }); - JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); - zipped.count(); - } - - @Test - public void zipPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); - JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); - FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = - new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() { - @Override - public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) { - int sizeI = 0; - int sizeS = 0; - while (i.hasNext()) { - sizeI += 1; - i.next(); - } - while (s.hasNext()) { - sizeS += 1; - s.next(); - } - return Arrays.asList(sizeI, sizeS); - } - }; - - JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); - Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); - } - - @Test - public void accumulators() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - final Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(new VoidFunction<Integer>() { - public void call(Integer x) { - intAccum.add(x); - } - }); - Assert.assertEquals((Integer) 25, intAccum.value()); - - final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(new VoidFunction<Integer>() { - public void call(Integer x) { - doubleAccum.add((double) x); - } - }); - Assert.assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { - public Float addInPlace(Float r, Float t) { - return r + t; - } - - public Float addAccumulator(Float r, Float t) { - return r + t; - } - - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam); - rdd.foreach(new VoidFunction<Integer>() { - public void call(Integer x) { - floatAccum.add((float) x); - } - }); - Assert.assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - Assert.assertEquals((Float) 5.0f, floatAccum.value()); - } - - @Test - public void keyBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); - List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() { - public String call(Integer t) throws Exception { - return t.toString(); - } - }).collect(); - Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1)); - } - - @Test - public void checkpointAndComputation() { - File tempDir = Files.createTempDir(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath()); - Assert.assertEquals(false, rdd.isCheckpointed()); - rdd.checkpoint(); - rdd.count(); // Forces the DAG to cause a checkpoint - Assert.assertEquals(true, rdd.isCheckpointed()); - Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect()); - } - - @Test - public void checkpointAndRestore() { - File tempDir = Files.createTempDir(); - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath()); - Assert.assertEquals(false, rdd.isCheckpointed()); - rdd.checkpoint(); - rdd.count(); // Forces the DAG to cause a checkpoint - Assert.assertEquals(true, rdd.isCheckpointed()); - - Assert.assertTrue(rdd.getCheckpointFile().isPresent()); - JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get()); - Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); - } - - @Test - public void mapOnPairRDD() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); - JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) throws Exception { - return new Tuple2<Integer, Integer>(i, i % 2); - } - }); - JavaPairRDD<Integer, Integer> rdd3 = rdd2.map( - new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception { - return new Tuple2<Integer, Integer>(in._2(), in._1()); - } - }); - Assert.assertEquals(Arrays.asList( - new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(0, 2), - new Tuple2<Integer, Integer>(1, 3), - new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); - - } - - @Test - public void collectPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - - JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() { - @Override - public Tuple2<Integer, Integer> call(Integer i) throws Exception { - return new Tuple2<Integer, Integer>(i, i % 2); - } - }); - - List[] parts = rdd1.collectPartitions(new int[] {0}); - Assert.assertEquals(Arrays.asList(1, 2), parts[0]); - - parts = rdd1.collectPartitions(new int[] {1, 2}); - Assert.assertEquals(Arrays.asList(3, 4), parts[0]); - Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1), - new Tuple2<Integer, Integer>(2, 0)), - rdd2.collectPartitions(new int[] {0})[0]); - - parts = rdd2.collectPartitions(new int[] {1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1), - new Tuple2<Integer, Integer>(4, 0)), - parts[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1), - new Tuple2<Integer, Integer>(6, 0), - new Tuple2<Integer, Integer>(7, 1)), - parts[1]); - } - - @Test - public void countApproxDistinct() { - List<Integer> arrayData = new ArrayList<Integer>(); - int size = 100; - for (int i = 0; i < 100000; i++) { - arrayData.add(i % size); - } - JavaRDD<Integer> simpleRdd = sc.parallelize(arrayData, 10); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.2) - size) / (size * 1.0)) < 0.2); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.05) - size) / (size * 1.0)) <= 0.05); - Assert.assertTrue(Math.abs((simpleRdd.countApproxDistinct(0.01) - size) / (size * 1.0)) <= 0.01); - } - - @Test - public void countApproxDistinctByKey() { - double relativeSD = 0.001; - - List<Tuple2<Integer, Integer>> arrayData = new ArrayList<Tuple2<Integer, Integer>>(); - for (int i = 10; i < 100; i++) - for (int j = 0; j < i; j++) - arrayData.add(new Tuple2<Integer, Integer>(i, j)); - - JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData); - List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD).collect(); - for (Tuple2<Integer, Object> resItem : res) { - double count = (double)resItem._1(); - Long resCount = (Long)resItem._2(); - Double error = Math.abs((resCount - count) / count); - Assert.assertTrue(error < relativeSD); - } - - } - - @Test - public void collectAsMapWithIntArrayValues() { - // Regression test for SPARK-1040 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 })); - JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() { - @Override - public Tuple2<Integer, int[]> call(Integer x) throws Exception { - return new Tuple2<Integer, int[]>(x, new int[] { x }); - } - }); - pairRDD.collect(); // Works fine - Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException - } -}