http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/README.md ---------------------------------------------------------------------- diff --git a/extras/README.md b/extras/README.md deleted file mode 100644 index 1b4174b..0000000 --- a/extras/README.md +++ /dev/null @@ -1 +0,0 @@ -This directory contains build components not included by default in Spark's build.
http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/README.md ---------------------------------------------------------------------- diff --git a/extras/java8-tests/README.md b/extras/java8-tests/README.md deleted file mode 100644 index dc9e87f..0000000 --- a/extras/java8-tests/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# Java 8 Test Suites - -These tests require having Java 8 installed and are isolated from the main Spark build. -If Java 8 is not your system's default Java version, you will need to point Spark's build -to your Java location. The set-up depends a bit on the build system: - -* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass - `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically - include the Java 8 test project. - - `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean "test-only org.apache.spark.Java8APISuite"` - -* For Maven users, - - Maven users can also refer to their Java 8 directory using JAVA_HOME. However, Maven will not - automatically detect the presence of a Java 8 JDK, so a special build profile `-Pjava8-tests` - must be used. - - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests` - `$ JAVA_HOME=/opt/jdk1.8.0/ mvn test -Pjava8-tests -DwildcardSuites=org.apache.spark.Java8APISuite` - - Note that the above command can only be run from project root directory since this module - depends on core and the test-jars of core and streaming. This means an install step is - required to make the test dependencies visible to the Java 8 sub-project. http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml deleted file mode 100644 index 0ad9c53..0000000 --- a/extras/java8-tests/pom.xml +++ /dev/null @@ -1,161 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -~ Licensed to the Apache Software Foundation (ASF) under one or more -~ contributor license agreements. See the NOTICE file distributed with -~ this work for additional information regarding copyright ownership. -~ The ASF licenses this file to You under the Apache License, Version 2.0 -~ (the "License"); you may not use this file except in compliance with -~ the License. You may obtain a copy of the License at -~ -~ http://www.apache.org/licenses/LICENSE-2.0 -~ -~ Unless required by applicable law or agreed to in writing, software -~ distributed under the License is distributed on an "AS IS" BASIS, -~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -~ See the License for the specific language governing permissions and -~ limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>java8-tests_2.11</artifactId> - <packaging>pom</packaging> - <name>Spark Project Java8 Tests POM</name> - - <properties> - <sbt.project.name>java8-tests</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-test-tags_${scala.binary.version}</artifactId> - </dependency> - </dependencies> - - <profiles> - <profile> - <id>java8-tests</id> - </profile> - </profiles> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-install-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - <configuration> - <systemPropertyVariables> - <!-- For some reason surefire isn't setting this log4j file on the - test classpath automatically. So we add it manually. --> - <log4j.configuration> - file:src/test/resources/log4j.properties - </log4j.configuration> - </systemPropertyVariables> - <skipTests>false</skipTests> - <includes> - <include>**/Suite*.java</include> - <include>**/*Suite.java</include> - </includes> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <executions> - <execution> - <id>test-compile-first</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <fork>true</fork> - <verbose>true</verbose> - <forceJavacCompilerUse>true</forceJavacCompilerUse> - <source>1.8</source> - <compilerVersion>1.8</compilerVersion> - <target>1.8</target> - <encoding>UTF-8</encoding> - <maxmem>1024m</maxmem> - </configuration> - </plugin> - <plugin> - <!-- disabled --> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <executions> - <execution> - <phase>none</phase> - </execution> - <execution> - <id>scala-compile-first</id> - <phase>none</phase> - </execution> - <execution> - <id>scala-test-compile-first</id> - <phase>none</phase> - </execution> - <execution> - <id>attach-scaladocs</id> - <phase>none</phase> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java deleted file mode 100644 index c0b58e7..0000000 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import java.io.File; -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Iterables; -import com.google.common.io.Files; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.util.Utils; - -/** - * Most of these tests replicate org.apache.spark.JavaAPISuite using java 8 - * lambda syntax. - */ -public class Java8APISuite implements Serializable { - static int foreachCalls = 0; - private transient JavaSparkContext sc; - - @Before - public void setUp() { - sc = new JavaSparkContext("local", "JavaAPISuite"); - } - - @After - public void tearDown() { - sc.stop(); - sc = null; - } - - @Test - public void foreachWithAnonymousClass() { - foreachCalls = 0; - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(new VoidFunction<String>() { - @Override - public void call(String s) { - foreachCalls++; - } - }); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void foreach() { - foreachCalls = 0; - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); - rdd.foreach(x -> foreachCalls++); - Assert.assertEquals(2, foreachCalls); - } - - @Test - public void groupBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function<Integer, Boolean> isOdd = x -> x % 2 == 0; - JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - - oddsAndEvens = rdd.groupBy(isOdd, 1); - Assert.assertEquals(2, oddsAndEvens.count()); - Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens - Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds - } - - @Test - public void leftOuterJoin() { - JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(1, 2), - new Tuple2<>(2, 1), - new Tuple2<>(3, 1) - )); - JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList( - new Tuple2<>(1, 'x'), - new Tuple2<>(2, 'y'), - new Tuple2<>(2, 'z'), - new Tuple2<>(4, 'w') - )); - List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined = - rdd1.leftOuterJoin(rdd2).collect(); - Assert.assertEquals(5, joined.size()); - Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched = - rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first(); - Assert.assertEquals(3, firstUnmatched._1().intValue()); - } - - @Test - public void foldReduce() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); - Function2<Integer, Integer, Integer> add = (a, b) -> a + b; - - int sum = rdd.fold(0, add); - Assert.assertEquals(33, sum); - - sum = rdd.reduce(add); - Assert.assertEquals(33, sum); - } - - @Test - public void foldByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b); - Assert.assertEquals(1, sums.lookup(1).get(0).intValue()); - Assert.assertEquals(2, sums.lookup(2).get(0).intValue()); - Assert.assertEquals(3, sums.lookup(3).get(0).intValue()); - } - - @Test - public void reduceByKey() { - List<Tuple2<Integer, Integer>> pairs = Arrays.asList( - new Tuple2<>(2, 1), - new Tuple2<>(2, 1), - new Tuple2<>(1, 1), - new Tuple2<>(3, 2), - new Tuple2<>(3, 1) - ); - JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs); - JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b); - Assert.assertEquals(1, counts.lookup(1).get(0).intValue()); - Assert.assertEquals(2, counts.lookup(2).get(0).intValue()); - Assert.assertEquals(3, counts.lookup(3).get(0).intValue()); - - Map<Integer, Integer> localCounts = counts.collectAsMap(); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - - localCounts = rdd.reduceByKeyLocally((a, b) -> a + b); - Assert.assertEquals(1, localCounts.get(1).intValue()); - Assert.assertEquals(2, localCounts.get(2).intValue()); - Assert.assertEquals(3, localCounts.get(3).intValue()); - } - - @Test - public void map() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache(); - doubles.collect(); - JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x)) - .cache(); - pairs.collect(); - JavaRDD<String> strings = rdd.map(Object::toString).cache(); - strings.collect(); - } - - @Test - public void flatMap() { - JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!", - "The quick brown fox jumps over the lazy dog.")); - JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" "))); - - Assert.assertEquals("Hello", words.first()); - Assert.assertEquals(11, words.count()); - - JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> { - List<Tuple2<String, String>> pairs2 = new LinkedList<>(); - for (String word : s.split(" ")) { - pairs2.add(new Tuple2<>(word, word)); - } - return pairs2; - }); - - Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first()); - Assert.assertEquals(11, pairs.count()); - - JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> { - List<Double> lengths = new LinkedList<>(); - for (String word : s.split(" ")) { - lengths.add((double) word.length()); - } - return lengths; - }); - - Assert.assertEquals(5.0, doubles.first(), 0.01); - Assert.assertEquals(11, pairs.count()); - } - - @Test - public void mapsFromPairsToPairs() { - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs); - - // Regression test for SPARK-668: - JavaPairRDD<String, Integer> swapped = - pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap())); - swapped.collect(); - - // There was never a bug here, but it's worth testing: - pairRDD.map(Tuple2::swap).collect(); - } - - @Test - public void mapPartitions() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2); - JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> { - int sum = 0; - while (iter.hasNext()) { - sum += iter.next(); - } - return Collections.singletonList(sum); - }); - - Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); - } - - @Test - public void sequenceFile() { - File tempDir = Files.createTempDir(); - tempDir.deleteOnExit(); - String outputDir = new File(tempDir, "output").getAbsolutePath(); - List<Tuple2<Integer, String>> pairs = Arrays.asList( - new Tuple2<>(1, "a"), - new Tuple2<>(2, "aa"), - new Tuple2<>(3, "aaa") - ); - JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs); - - rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2()))) - .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); - - // Try reading the output back as an object file - JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class) - .mapToPair(pair -> new Tuple2<>(pair._1().get(), pair._2().toString())); - Assert.assertEquals(pairs, readRDD.collect()); - Utils.deleteRecursively(tempDir); - } - - @Test - public void zip() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x); - JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); - zipped.count(); - } - - @Test - public void zipPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2); - JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2); - FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn = - (Iterator<Integer> i, Iterator<String> s) -> { - int sizeI = 0; - while (i.hasNext()) { - sizeI += 1; - i.next(); - } - int sizeS = 0; - while (s.hasNext()) { - sizeS += 1; - s.next(); - } - return Arrays.asList(sizeI, sizeS).iterator(); - }; - JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); - Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); - } - - @Test - public void accumulators() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - - Accumulator<Integer> intAccum = sc.intAccumulator(10); - rdd.foreach(intAccum::add); - Assert.assertEquals((Integer) 25, intAccum.value()); - - Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0); - rdd.foreach(x -> doubleAccum.add((double) x)); - Assert.assertEquals((Double) 25.0, doubleAccum.value()); - - // Try a custom accumulator type - AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() { - @Override - public Float addInPlace(Float r, Float t) { - return r + t; - } - @Override - public Float addAccumulator(Float r, Float t) { - return r + t; - } - @Override - public Float zero(Float initialValue) { - return 0.0f; - } - }; - - Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam); - rdd.foreach(x -> floatAccum.add((float) x)); - Assert.assertEquals((Float) 25.0f, floatAccum.value()); - - // Test the setValue method - floatAccum.setValue(5.0f); - Assert.assertEquals((Float) 5.0f, floatAccum.value()); - } - - @Test - public void keyBy() { - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); - List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect(); - Assert.assertEquals(new Tuple2<>("1", 1), s.get(0)); - Assert.assertEquals(new Tuple2<>("2", 2), s.get(1)); - } - - @Test - public void mapOnPairRDD() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4)); - JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - JavaPairRDD<Integer, Integer> rdd3 = - rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1())); - Assert.assertEquals(Arrays.asList( - new Tuple2<>(1, 1), - new Tuple2<>(0, 2), - new Tuple2<>(1, 3), - new Tuple2<>(0, 4)), rdd3.collect()); - } - - @Test - public void collectPartitions() { - JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3); - - JavaPairRDD<Integer, Integer> rdd2 = - rdd1.mapToPair(i -> new Tuple2<>(i, i % 2)); - List<Integer>[] parts = rdd1.collectPartitions(new int[]{0}); - Assert.assertEquals(Arrays.asList(1, 2), parts[0]); - - parts = rdd1.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(3, 4), parts[0]); - Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]); - - Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)), - rdd2.collectPartitions(new int[]{0})[0]); - - List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2}); - Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]); - Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)), - parts2[1]); - } - - @Test - public void collectAsMapWithIntArrayValues() { - // Regression test for SPARK-1040 - JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1)); - JavaPairRDD<Integer, int[]> pairRDD = - rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x})); - pairRDD.collect(); // Works fine - pairRDD.collectAsMap(); // Used to crash with ClassCastException - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java ---------------------------------------------------------------------- diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java deleted file mode 100644 index 604d818..0000000 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ /dev/null @@ -1,905 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import java.io.Serializable; -import java.util.*; - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.junit.Assert; -import org.junit.Test; - -import org.apache.spark.Accumulator; -import org.apache.spark.HashPartitioner; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; - -/** - * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 - * lambda syntax. - */ -@SuppressWarnings("unchecked") -public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { - - @Test - public void testMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("hello", "world"), - Arrays.asList("goodnight", "moon")); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(5, 5), - Arrays.asList(9, 4)); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> letterCount = stream.map(String::length); - JavaTestUtils.attachTestOutputStream(letterCount); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testFilter() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("yankees")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> filtered = stream.filter(s -> s.contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testMapPartitions() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("GIANTSDODGERS"), - Arrays.asList("YANKEESRED SOX")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> mapped = stream.mapPartitions(in -> { - String out = ""; - while (in.hasNext()) { - out = out + in.next().toUpperCase(); - } - return Lists.newArrayList(out); - }); - JavaTestUtils.attachTestOutputStream(mapped); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduce() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(15), - Arrays.asList(24)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reduced = stream.reduce((x, y) -> x + y); - JavaTestUtils.attachTestOutputStream(reduced); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByWindow() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(6), - Arrays.asList(21), - Arrays.asList(39), - Arrays.asList(24)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y, - (x, y) -> x - y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reducedWindowed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); - - Assert.assertEquals(expected, result); - } - - @Test - public void testTransform() { - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1, 2, 3), - Arrays.asList(4, 5, 6), - Arrays.asList(7, 8, 9)); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(3, 4, 5), - Arrays.asList(6, 7, 8), - Arrays.asList(9, 10, 11)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = stream.transform(in -> in.map(i -> i + 2)); - - JavaTestUtils.attachTestOutputStream(transformed); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testVariousTransform() { - // tests whether all variations of transform can be called from Java - - List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - - List<List<Tuple2<String, Integer>>> pairInputData = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); - - JavaDStream<Integer> transformed1 = stream.transform(in -> null); - JavaDStream<Integer> transformed2 = stream.transform((x, time) -> null); - JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(x -> null); - JavaPairDStream<String, Integer> transformed4 = stream.transformToPair((x, time) -> null); - JavaDStream<Integer> pairTransformed1 = pairStream.transform(x -> null); - JavaDStream<Integer> pairTransformed2 = pairStream.transform((x, time) -> null); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(x -> null); - JavaPairDStream<String, String> pairTransformed4 = - pairStream.transformToPair((x, time) -> null); - - } - - @Test - public void testTransformWith() { - List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "dodgers"), - new Tuple2<>("new york", "yankees")), - Arrays.asList( - new Tuple2<>("california", "sharks"), - new Tuple2<>("new york", "rangers"))); - - List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "mets")), - Arrays.asList( - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "islanders"))); - - - List<Set<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("dodgers", "giants")), - new Tuple2<>("new york", - new Tuple2<>("yankees", "mets"))), - Sets.newHashSet( - new Tuple2<>("california", - new Tuple2<>("sharks", "ducks")), - new Tuple2<>("new york", - new Tuple2<>("rangers", "islanders")))); - - JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream1, 1); - JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); - - JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( - ssc, stringStringKVStream2, 1); - JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - - JavaPairDStream<String, Tuple2<String, String>> joined = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> x.join(y)); - - JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - List<Set<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); - for (List<Tuple2<String, Tuple2<String, String>>> res : result) { - unorderedResult.add(Sets.newHashSet(res)); - } - - Assert.assertEquals(expected, unorderedResult); - } - - - @Test - public void testVariousTransformWith() { - // tests whether all variations of transformWith can be called from Java - - List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); - List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); - JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); - JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); - - List<List<Tuple2<String, Integer>>> pairInputData1 = - Arrays.asList(Arrays.asList(new Tuple2<>("x", 1))); - List<List<Tuple2<Double, Character>>> pairInputData2 = - Arrays.asList(Arrays.asList(new Tuple2<>(1.0, 'x'))); - JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); - JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); - - JavaDStream<Double> transformed1 = stream1.transformWith(stream2, (x, y, z) -> null); - JavaDStream<Double> transformed2 = stream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream<Double, Double> transformed3 = - stream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream<Double, Double> transformed4 = - stream1.transformWithToPair(pairStream1,(x, y, z) -> null); - - JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(stream2,(x, y, z) -> null); - - JavaDStream<Double> pairTransformed2_ = - pairStream1.transformWith(pairStream1,(x, y, z) -> null); - - JavaPairDStream<Double, Double> pairTransformed3 = - pairStream1.transformWithToPair(stream2,(x, y, z) -> null); - - JavaPairDStream<Double, Double> pairTransformed4 = - pairStream1.transformWithToPair(pairStream2,(x, y, z) -> null); - } - - @Test - public void testStreamingContextTransform() { - List<List<Integer>> stream1input = Arrays.asList( - Arrays.asList(1), - Arrays.asList(2) - ); - - List<List<Integer>> stream2input = Arrays.asList( - Arrays.asList(3), - Arrays.asList(4) - ); - - List<List<Tuple2<Integer, String>>> pairStream1input = Arrays.asList( - Arrays.asList(new Tuple2<>(1, "x")), - Arrays.asList(new Tuple2<>(2, "y")) - ); - - List<List<Tuple2<Integer, Tuple2<Integer, String>>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>(1, new Tuple2<>(1, "x"))), - Arrays.asList(new Tuple2<>(2, new Tuple2<>(2, "y"))) - ); - - JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, stream1input, 1); - JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, stream2input, 1); - JavaPairDStream<Integer, String> pairStream1 = JavaPairDStream.fromJavaDStream( - JavaTestUtils.attachTestInputStream(ssc, pairStream1input, 1)); - - List<JavaDStream<?>> listOfDStreams1 = Arrays.<JavaDStream<?>>asList(stream1, stream2); - - // This is just to test whether this transform to JavaStream compiles - JavaDStream<Long> transformed1 = ssc.transform( - listOfDStreams1, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - Assert.assertEquals(2, listOfRDDs.size()); - return null; - }); - - List<JavaDStream<?>> listOfDStreams2 = - Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); - - JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( - listOfDStreams2, (List<JavaRDD<?>> listOfRDDs, Time time) -> { - Assert.assertEquals(3, listOfRDDs.size()); - JavaRDD<Integer> rdd1 = (JavaRDD<Integer>) listOfRDDs.get(0); - JavaRDD<Integer> rdd2 = (JavaRDD<Integer>) listOfRDDs.get(1); - JavaRDD<Tuple2<Integer, String>> rdd3 = (JavaRDD<Tuple2<Integer, String>>) listOfRDDs.get(2); - JavaPairRDD<Integer, String> prdd3 = JavaPairRDD.fromJavaRDD(rdd3); - PairFunction<Integer, Integer, Integer> mapToTuple = - (Integer i) -> new Tuple2<>(i, i); - return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); - }); - JavaTestUtils.attachTestOutputStream(transformed2); - List<List<Tuple2<Integer, Tuple2<Integer, String>>>> result = - JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("go", "giants"), - Arrays.asList("boo", "dodgers"), - Arrays.asList("athletics")); - - List<List<String>> expected = Arrays.asList( - Arrays.asList("g", "o", "g", "i", "a", "n", "t", "s"), - Arrays.asList("b", "o", "o", "d", "o", "d", "g", "e", "r", "s"), - Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s")); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)"))); - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - assertOrderInvariantEquals(expected, result); - } - - @Test - public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sc().accumulator(0); - List<List<Integer>> inputData = Arrays.asList( - Arrays.asList(1,1,1), - Arrays.asList(1,1,1)); - - JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output - - stream.foreachRDD(rdd -> { - accumRdd.add(1); - rdd.foreach(x -> accumEle.add(1)); - }); - - // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD((rdd, time) -> null); - - JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(2, accumRdd.value().intValue()); - Assert.assertEquals(6, accumEle.value().intValue()); - } - - @Test - public void testPairFlatMap() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants"), - Arrays.asList("dodgers"), - Arrays.asList("athletics")); - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(6, "g"), - new Tuple2<>(6, "i"), - new Tuple2<>(6, "a"), - new Tuple2<>(6, "n"), - new Tuple2<>(6, "t"), - new Tuple2<>(6, "s")), - Arrays.asList( - new Tuple2<>(7, "d"), - new Tuple2<>(7, "o"), - new Tuple2<>(7, "d"), - new Tuple2<>(7, "g"), - new Tuple2<>(7, "e"), - new Tuple2<>(7, "r"), - new Tuple2<>(7, "s")), - Arrays.asList( - new Tuple2<>(9, "a"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "h"), - new Tuple2<>(9, "l"), - new Tuple2<>(9, "e"), - new Tuple2<>(9, "t"), - new Tuple2<>(9, "i"), - new Tuple2<>(9, "c"), - new Tuple2<>(9, "s"))); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(s -> { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter : s.split("(?!^)")) { - out.add(new Tuple2<>(s.length(), letter)); - } - return out; - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - /* - * Performs an order-invariant comparison of lists representing two RDD streams. This allows - * us to account for ordering variation within individual RDD's which occurs during windowing. - */ - public static <T extends Comparable<T>> void assertOrderInvariantEquals( - List<List<T>> expected, List<List<T>> actual) { - expected.forEach(list -> Collections.sort(list)); - List<List<T>> sortedActual = new ArrayList<>(); - actual.forEach(list -> { - List<T> sortedList = new ArrayList<>(list); - Collections.sort(sortedList); - sortedActual.add(sortedList); - }); - Assert.assertEquals(expected, sortedActual); - } - - @Test - public void testPairFilter() { - List<List<String>> inputData = Arrays.asList( - Arrays.asList("giants", "dodgers"), - Arrays.asList("yankees", "red sox")); - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("giants", 6)), - Arrays.asList(new Tuple2<>("yankees", 7))); - - JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = - stream.mapToPair(x -> new Tuple2<>(x, x.length())); - JavaPairDStream<String, Integer> filtered = pairStream.filter(x -> x._1().contains("a")); - JavaTestUtils.attachTestOutputStream(filtered); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers"), - new Tuple2<>("california", "giants"), - new Tuple2<>("new york", "yankees"), - new Tuple2<>("new york", "mets")), - Arrays.asList(new Tuple2<>("california", "sharks"), - new Tuple2<>("california", "ducks"), - new Tuple2<>("new york", "rangers"), - new Tuple2<>("new york", "islanders"))); - - List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 1), - new Tuple2<>("california", 3), - new Tuple2<>("new york", 4), - new Tuple2<>("new york", 1)), - Arrays.asList( - new Tuple2<>("california", 5), - new Tuple2<>("california", 5), - new Tuple2<>("new york", 3), - new Tuple2<>("new york", 1))); - - @Test - public void testPairMap() { // Maps pair -> pair of different type - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(x -> x.swap()); - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMapPartitions() { // Maps pair -> pair of different type - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "california"), - new Tuple2<>(3, "california"), - new Tuple2<>(4, "new york"), - new Tuple2<>(1, "new york")), - Arrays.asList( - new Tuple2<>(5, "california"), - new Tuple2<>(5, "california"), - new Tuple2<>(3, "new york"), - new Tuple2<>(1, "new york"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(in -> { - LinkedList<Tuple2<Integer, String>> out = new LinkedList<>(); - while (in.hasNext()) { - Tuple2<String, Integer> next = in.next(); - out.add(next.swap()); - } - return out; - }); - - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairMap2() { // Maps pair -> single - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(1, 3, 4, 1), - Arrays.asList(5, 5, 3, 1)); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> reversed = pairStream.map(in -> in._2()); - JavaTestUtils.attachTestOutputStream(reversed); - List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair - List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2)), - Arrays.asList( - new Tuple2<>("hi", 1), - new Tuple2<>("ho", 2))); - - List<List<Tuple2<Integer, String>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o")), - Arrays.asList( - new Tuple2<>(1, "h"), - new Tuple2<>(1, "i"), - new Tuple2<>(2, "h"), - new Tuple2<>(2, "o"))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(in -> { - List<Tuple2<Integer, String>> out = new LinkedList<>(); - for (Character s : in._1().toCharArray()) { - out.add(new Tuple2<>(in._2(), s.toString())); - } - return out; - }); - - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairReduceByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey((x, y) -> x + y); - - JavaTestUtils.attachTestOutputStream(reduced); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testCombineByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList( - new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(i -> i, - (x, y) -> x + y, (x, y) -> x + y, new HashPartitioner(2)); - - JavaTestUtils.attachTestOutputStream(combined); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindow() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, new Duration(2000), new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testUpdateStateByKey() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey((values, state) -> { - int out = 0; - if (state.isPresent()) { - out = out + state.get(); - } - for (Integer v : values) { - out = out + v; - } - return Optional.of(out); - }); - - JavaTestUtils.attachTestOutputStream(updated); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testReduceByKeyAndWindowWithInverse() { - List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; - - List<List<Tuple2<String, Integer>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", 4), - new Tuple2<>("new york", 5)), - Arrays.asList(new Tuple2<>("california", 14), - new Tuple2<>("new york", 9)), - Arrays.asList(new Tuple2<>("california", 10), - new Tuple2<>("new york", 4))); - - JavaDStream<Tuple2<String, Integer>> stream = - JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow((x, y) -> x + y, (x, y) -> x - y, new Duration(2000), - new Duration(1000)); - JavaTestUtils.attachTestOutputStream(reduceWindowed); - List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairTransform() { - List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5)), - Arrays.asList( - new Tuple2<>(1, 5), - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5))); - - JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(in -> in.sortByKey()); - - JavaTestUtils.attachTestOutputStream(sorted); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testPairToNormalRDDTransform() { - List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( - Arrays.asList( - new Tuple2<>(3, 5), - new Tuple2<>(1, 5), - new Tuple2<>(4, 5), - new Tuple2<>(2, 5)), - Arrays.asList( - new Tuple2<>(2, 5), - new Tuple2<>(3, 5), - new Tuple2<>(4, 5), - new Tuple2<>(1, 5))); - - List<List<Integer>> expected = Arrays.asList( - Arrays.asList(3, 1, 4, 2), - Arrays.asList(2, 3, 4, 1)); - - JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaDStream<Integer> firstParts = pairStream.transform(in -> in.map(x -> x._1())); - JavaTestUtils.attachTestOutputStream(firstParts); - List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testMapValues() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; - - List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "DODGERS"), - new Tuple2<>("california", "GIANTS"), - new Tuple2<>("new york", "YANKEES"), - new Tuple2<>("new york", "METS")), - Arrays.asList(new Tuple2<>("california", "SHARKS"), - new Tuple2<>("california", "DUCKS"), - new Tuple2<>("new york", "RANGERS"), - new Tuple2<>("new york", "ISLANDERS"))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> mapped = pairStream.mapValues(String::toUpperCase); - JavaTestUtils.attachTestOutputStream(mapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); - } - - @Test - public void testFlatMapValues() { - List<List<Tuple2<String, String>>> inputData = stringStringKVStream; - - List<List<Tuple2<String, String>>> expected = Arrays.asList( - Arrays.asList(new Tuple2<>("california", "dodgers1"), - new Tuple2<>("california", "dodgers2"), - new Tuple2<>("california", "giants1"), - new Tuple2<>("california", "giants2"), - new Tuple2<>("new york", "yankees1"), - new Tuple2<>("new york", "yankees2"), - new Tuple2<>("new york", "mets1"), - new Tuple2<>("new york", "mets2")), - Arrays.asList(new Tuple2<>("california", "sharks1"), - new Tuple2<>("california", "sharks2"), - new Tuple2<>("california", "ducks1"), - new Tuple2<>("california", "ducks2"), - new Tuple2<>("new york", "rangers1"), - new Tuple2<>("new york", "rangers2"), - new Tuple2<>("new york", "islanders1"), - new Tuple2<>("new york", "islanders2"))); - - JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( - ssc, inputData, 1); - JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - - JavaPairDStream<String, String> flatMapped = - pairStream.flatMapValues(in -> Arrays.asList(in + "1", in + "2")); - JavaTestUtils.attachTestOutputStream(flatMapped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - Assert.assertEquals(expected, result); - } - - /** - * This test is only for testing the APIs. It's not necessary to run it. - */ - public void testMapWithStateAPI() { - JavaPairRDD<String, Boolean> initialRDD = null; - JavaPairDStream<String, Integer> wordsDstream = null; - - JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> { - // Use all State's methods here - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return Optional.of(2.0); - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots(); - - JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 = - wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> { - state.exists(); - state.get(); - state.isTimingOut(); - state.remove(); - state.update(true); - return 2.0; - }).initialState(initialRDD) - .numPartitions(10) - .partitioner(new HashPartitioner(10)) - .timeout(Durations.seconds(10))); - - JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/java8-tests/src/test/resources/log4j.properties b/extras/java8-tests/src/test/resources/log4j.properties deleted file mode 100644 index eb3b1999..0000000 --- a/extras/java8-tests/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN -org.spark-project.jetty.LEVEL=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala ---------------------------------------------------------------------- diff --git a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala b/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala deleted file mode 100644 index fa0681d..0000000 --- a/extras/java8-tests/src/test/scala/org/apache/spark/JDK8ScalaSuite.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -/** - * Test cases where JDK8-compiled Scala user code is used with Spark. - */ -class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext { - test("basic RDD closure test (SPARK-6152)") { - sc.parallelize(1 to 1000).map(x => x * x).count() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl-assembly/pom.xml b/extras/kinesis-asl-assembly/pom.xml deleted file mode 100644 index d1c38c7..0000000 --- a/extras/kinesis-asl-assembly/pom.xml +++ /dev/null @@ -1,181 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kinesis-asl-assembly_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Project Kinesis Assembly</name> - <url>http://spark.apache.org/</url> - - <properties> - <sbt.project.name>streaming-kinesis-asl-assembly</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <!-- - Demote already included in the Spark assembly. - --> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>net.java.dev.jets3t</groupId> - <artifactId>jets3t</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-ipc</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <classifier>${avro.mapred.classifier}</classifier> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <shadedArtifactAttached>false</shadedArtifactAttached> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>reference.conf</resource> - </transformer> - <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> - <resource>log4j.properties</resource> - </transformer> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> -</build> -</project> - http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/pom.xml ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml deleted file mode 100644 index 935155e..0000000 --- a/extras/kinesis-asl/pom.xml +++ /dev/null @@ -1,87 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -~ Licensed to the Apache Software Foundation (ASF) under one or more -~ contributor license agreements. See the NOTICE file distributed with -~ this work for additional information regarding copyright ownership. -~ The ASF licenses this file to You under the Apache License, Version 2.0 -~ (the "License"); you may not use this file except in compliance with -~ the License. You may obtain a copy of the License at -~ -~ http://www.apache.org/licenses/LICENSE-2.0 -~ -~ Unless required by applicable law or agreed to in writing, software -~ distributed under the License is distributed on an "AS IS" BASIS, -~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -~ See the License for the specific language governing permissions and -~ limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>2.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <!-- Kinesis integration is not included by default due to ASL-licensed code. --> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kinesis-asl_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Kinesis Integration</name> - - <properties> - <sbt.project.name>streaming-kinesis-asl</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>amazon-kinesis-client</artifactId> - <version>${aws.kinesis.client.version}</version> - </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>amazon-kinesis-producer</artifactId> - <version>${aws.kinesis.producer.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-test-tags_${scala.binary.version}</artifactId> - </dependency> - </dependencies> - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - </build> -</project> http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java deleted file mode 100644 index 5dc825d..0000000 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.examples.streaming; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.regex.Pattern; - -import com.amazonaws.regions.RegionUtils; -import org.apache.log4j.Logger; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kinesis.KinesisUtils; - -import scala.Tuple2; - -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; - -/** - * Consumes messages from a Amazon Kinesis streams and does wordcount. - * - * This example spins up 1 Kinesis Receiver per shard for the given stream. - * It then starts pulling from the last checkpointed sequence number of the given stream. - * - * Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name] - * [app-name] is the name of the consumer app, used to track the read data in DynamoDB - * [stream-name] name of the Kinesis stream (ie. mySparkStream) - * [endpoint-url] endpoint of the Kinesis service - * (e.g. https://kinesis.us-east-1.amazonaws.com) - * - * - * Example: - * # export AWS keys if necessary - * $ export AWS_ACCESS_KEY_ID=[your-access-key] - * $ export AWS_SECRET_KEY=<your-secret-key> - * - * # run the example - * $ SPARK_HOME/bin/run-example streaming.JavaKinesisWordCountASL myAppName mySparkStream \ - * https://kinesis.us-east-1.amazonaws.com - * - * There is a companion helper class called KinesisWordProducerASL which puts dummy data - * onto the Kinesis stream. - * - * This code uses the DefaultAWSCredentialsProviderChain to find credentials - * in the following order: - * Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - * Java System Properties - aws.accessKeyId and aws.secretKey - * Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - * Instance profile credentials - delivered through the Amazon EC2 metadata service - * For more information, see - * http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html - * - * See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on - * the Kinesis Spark Streaming integration. - */ -public final class JavaKinesisWordCountASL { // needs to be public for access from run-example - private static final Pattern WORD_SEPARATOR = Pattern.compile(" "); - private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class); - - public static void main(String[] args) { - // Check that all required args were passed in. - if (args.length != 3) { - System.err.println( - "Usage: JavaKinesisWordCountASL <stream-name> <endpoint-url>\n\n" + - " <app-name> is the name of the app, used to track the read data in DynamoDB\n" + - " <stream-name> is the name of the Kinesis stream\n" + - " <endpoint-url> is the endpoint of the Kinesis service\n" + - " (e.g. https://kinesis.us-east-1.amazonaws.com)\n" + - "Generate data for the Kinesis stream using the example KinesisWordProducerASL.\n" + - "See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more\n" + - "details.\n" - ); - System.exit(1); - } - - // Set default log4j logging level to WARN to hide Spark logs - StreamingExamples.setStreamingLogLevels(); - - // Populate the appropriate variables from the given args - String kinesisAppName = args[0]; - String streamName = args[1]; - String endpointUrl = args[2]; - - // Create a Kinesis client in order to determine the number of shards for the given stream - AmazonKinesisClient kinesisClient = - new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()); - kinesisClient.setEndpoint(endpointUrl); - int numShards = - kinesisClient.describeStream(streamName).getStreamDescription().getShards().size(); - - - // In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard. - // This is not a necessity; if there are less receivers/DStreams than the number of shards, - // then the shards will be automatically distributed among the receivers and each receiver - // will receive data from multiple shards. - int numStreams = numShards; - - // Spark Streaming batch interval - Duration batchInterval = new Duration(2000); - - // Kinesis checkpoint interval. Same as batchInterval for this example. - Duration kinesisCheckpointInterval = batchInterval; - - // Get the region name from the endpoint URL to save Kinesis Client Library metadata in - // DynamoDB of the same region as the Kinesis stream - String regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName(); - - // Setup the Spark config and StreamingContext - SparkConf sparkConfig = new SparkConf().setAppName("JavaKinesisWordCountASL"); - JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, batchInterval); - - // Create the Kinesis DStreams - List<JavaDStream<byte[]>> streamsList = new ArrayList<>(numStreams); - for (int i = 0; i < numStreams; i++) { - streamsList.add( - KinesisUtils.createStream(jssc, kinesisAppName, streamName, endpointUrl, regionName, - InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2()) - ); - } - - // Union all the streams if there is more than 1 stream - JavaDStream<byte[]> unionStreams; - if (streamsList.size() > 1) { - unionStreams = jssc.union(streamsList.get(0), streamsList.subList(1, streamsList.size())); - } else { - // Otherwise, just use the 1 stream - unionStreams = streamsList.get(0); - } - - // Convert each line of Array[Byte] to String, and split into words - JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { - @Override - public Iterator<String> call(byte[] line) { - String s = new String(line, StandardCharsets.UTF_8); - return Arrays.asList(WORD_SEPARATOR.split(s)).iterator(); - } - }); - - // Map each word to a (word, 1) tuple so we can reduce by key to count the words - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<String, Integer>(s, 1); - } - } - ).reduceByKey( - new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - } - ); - - // Print the first 10 wordCounts - wordCounts.print(); - - // Start the streaming context and await termination - jssc.start(); - jssc.awaitTermination(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py b/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py deleted file mode 100644 index 51f8c5c..0000000 --- a/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py +++ /dev/null @@ -1,83 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" - Consumes messages from a Amazon Kinesis streams and does wordcount. - - This example spins up 1 Kinesis Receiver per shard for the given stream. - It then starts pulling from the last checkpointed sequence number of the given stream. - - Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name> - <app-name> is the name of the consumer app, used to track the read data in DynamoDB - <stream-name> name of the Kinesis stream (ie. mySparkStream) - <endpoint-url> endpoint of the Kinesis service - (e.g. https://kinesis.us-east-1.amazonaws.com) - - - Example: - # export AWS keys if necessary - $ export AWS_ACCESS_KEY_ID=<your-access-key> - $ export AWS_SECRET_KEY=<your-secret-key> - - # run the example - $ bin/spark-submit -jar extras/kinesis-asl/target/scala-*/\ - spark-streaming-kinesis-asl-assembly_*.jar \ - extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ - myAppName mySparkStream https://kinesis.us-east-1.amazonaws.com - - There is a companion helper class called KinesisWordProducerASL which puts dummy data - onto the Kinesis stream. - - This code uses the DefaultAWSCredentialsProviderChain to find credentials - in the following order: - Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY - Java System Properties - aws.accessKeyId and aws.secretKey - Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs - Instance profile credentials - delivered through the Amazon EC2 metadata service - For more information, see - http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html - - See http://spark.apache.org/docs/latest/streaming-kinesis-integration.html for more details on - the Kinesis Spark Streaming integration. -""" -from __future__ import print_function - -import sys - -from pyspark import SparkContext -from pyspark.streaming import StreamingContext -from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream - -if __name__ == "__main__": - if len(sys.argv) != 5: - print( - "Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>", - file=sys.stderr) - sys.exit(-1) - - sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl") - ssc = StreamingContext(sc, 1) - appName, streamName, endpointUrl, regionName = sys.argv[1:] - lines = KinesisUtils.createStream( - ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) - counts.pprint() - - ssc.start() - ssc.awaitTermination() http://git-wip-us.apache.org/repos/asf/spark/blob/256704c7/extras/kinesis-asl/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/resources/log4j.properties b/extras/kinesis-asl/src/main/resources/log4j.properties deleted file mode 100644 index 6cdc928..0000000 --- a/extras/kinesis-asl/src/main/resources/log4j.properties +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -log4j.rootCategory=WARN, console - -# File appender -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=false -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n - -# Console appender -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.out -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n - -# Settings to quiet third party logs that are too verbose -log4j.logger.org.spark-project.jetty=WARN -log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR -log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO -log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org