Scala 2.11 support with repl and all build changes.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4af9de7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4af9de7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4af9de7d Branch: refs/heads/scala-2.11-prashant Commit: 4af9de7dc72a809e10f4a287b509dec4ca12ae53 Parents: 48a19a6 Author: Prashant Sharma <[email protected]> Authored: Mon Oct 20 17:43:38 2014 +0530 Committer: Prashant Sharma <[email protected]> Committed: Fri Nov 7 11:22:47 2014 +0530 ---------------------------------------------------------------------- .rat-excludes | 1 + assembly/pom.xml | 8 +- bin/compute-classpath.sh | 8 +- conf/spark-env.sh.template | 3 + core/pom.xml | 41 +- dev/change-version-to-2.10.sh | 20 + dev/change-version-to-2.11.sh | 20 + examples/pom.xml | 149 +- .../examples/streaming/JavaKafkaWordCount.java | 113 ++ .../examples/streaming/KafkaWordCount.scala | 102 ++ .../examples/streaming/TwitterAlgebirdCMS.scala | 114 ++ .../examples/streaming/TwitterAlgebirdHLL.scala | 92 ++ .../examples/streaming/JavaKafkaWordCount.java | 113 -- .../examples/streaming/KafkaWordCount.scala | 102 -- .../examples/streaming/TwitterAlgebirdCMS.scala | 114 -- .../examples/streaming/TwitterAlgebirdHLL.scala | 92 -- external/mqtt/pom.xml | 5 - pom.xml | 114 +- project/SparkBuild.scala | 12 +- project/project/SparkPluginBuild.scala | 2 +- repl/pom.xml | 90 +- .../main/scala/org/apache/spark/repl/Main.scala | 33 + .../apache/spark/repl/SparkCommandLine.scala | 37 + .../org/apache/spark/repl/SparkExprTyper.scala | 114 ++ .../org/apache/spark/repl/SparkHelper.scala | 22 + .../org/apache/spark/repl/SparkILoop.scala | 1091 +++++++++++++ .../org/apache/spark/repl/SparkILoopInit.scala | 147 ++ .../org/apache/spark/repl/SparkIMain.scala | 1445 ++++++++++++++++++ .../org/apache/spark/repl/SparkImports.scala | 238 +++ .../spark/repl/SparkJLineCompletion.scala | 377 +++++ .../apache/spark/repl/SparkJLineReader.scala | 90 ++ .../apache/spark/repl/SparkMemberHandlers.scala | 232 +++ .../apache/spark/repl/SparkRunnerSettings.scala | 32 + .../scala/org/apache/spark/repl/ReplSuite.scala | 318 ++++ .../main/scala/org/apache/spark/repl/Main.scala | 85 ++ .../org/apache/spark/repl/SparkExprTyper.scala | 86 ++ .../org/apache/spark/repl/SparkILoop.scala | 966 ++++++++++++ .../org/apache/spark/repl/SparkIMain.scala | 1319 ++++++++++++++++ .../org/apache/spark/repl/SparkImports.scala | 201 +++ .../spark/repl/SparkJLineCompletion.scala | 350 +++++ .../apache/spark/repl/SparkMemberHandlers.scala | 221 +++ .../apache/spark/repl/SparkReplReporter.scala | 53 + .../scala/org/apache/spark/repl/ReplSuite.scala | 326 ++++ .../main/scala/org/apache/spark/repl/Main.scala | 33 - .../apache/spark/repl/SparkCommandLine.scala | 37 - .../org/apache/spark/repl/SparkExprTyper.scala | 114 -- .../org/apache/spark/repl/SparkHelper.scala | 22 - .../org/apache/spark/repl/SparkILoop.scala | 1091 ------------- .../org/apache/spark/repl/SparkILoopInit.scala | 147 -- .../org/apache/spark/repl/SparkIMain.scala | 1445 ------------------ .../org/apache/spark/repl/SparkImports.scala | 238 --- .../spark/repl/SparkJLineCompletion.scala | 377 ----- .../apache/spark/repl/SparkJLineReader.scala | 90 -- .../apache/spark/repl/SparkMemberHandlers.scala | 232 --- .../apache/spark/repl/SparkRunnerSettings.scala | 32 - .../scala/org/apache/spark/repl/ReplSuite.scala | 318 ---- sql/catalyst/pom.xml | 29 +- 57 files changed, 8602 insertions(+), 4701 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/.rat-excludes ---------------------------------------------------------------------- diff --git a/.rat-excludes b/.rat-excludes index 20e3372..d8bee1f 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -44,6 +44,7 @@ SparkImports.scala SparkJLineCompletion.scala SparkJLineReader.scala SparkMemberHandlers.scala +SparkReplReporter.scala sbt sbt-launch-lib.bash plugins.sbt http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 31a01e4..e592220 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -66,22 +66,22 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_${scala.binary.version}</artifactId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <artifactId>spark-graphx_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-graphx_${scala.binary.version}</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/bin/compute-classpath.sh ---------------------------------------------------------------------- diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 905bbaf..993d260 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -20,7 +20,7 @@ # This script computes Spark's classpath and prints it to stdout; it's used by both the "run" # script and the ExecutorRunner in standalone cluster mode. -SCALA_VERSION=2.10 +SCALA_VERSION=${SCALA_VERSION:-"2.10"} # Figure out where Spark is installed FWDIR="$(cd "`dirname "$0"`"/..; pwd)" @@ -121,6 +121,9 @@ if [ -n "$datanucleus_jars" ]; then fi fi +test_jars=$(find "$FWDIR"/lib_managed/test \( -name '*jar' -a -type f \) 2>/dev/null | \ + tr "\n" : | sed s/:$//g) + # Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1 if [[ $SPARK_TESTING == 1 ]]; then CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes" @@ -132,6 +135,9 @@ if [[ $SPARK_TESTING == 1 ]]; then CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/test-classes" CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/test-classes" + if [[ $SCALA_VERSION == "2.11" ]]; then + CLASSPATH="$CLASSPATH:$test_jars" + fi fi # Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail ! http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/conf/spark-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index f8ffbf6..6a5622e 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -3,6 +3,9 @@ # This file is sourced when running various Spark programs. # Copy it as spark-env.sh and edit that to configure Spark for your site. +# Uncomment this if you plan to use scala 2.11 +# SCALA_VERSION=2.11 + # Options read when launching programs locally with # ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 41296e0..624aa96 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -35,6 +35,34 @@ <url>http://spark.apache.org/</url> <dependencies> <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill_${scala.binary.version}</artifactId> + <exclusions> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-commons</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill-java</artifactId> + <exclusions> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-commons</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <exclusions> @@ -133,14 +161,6 @@ <artifactId>lz4</artifactId> </dependency> <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill-java</artifactId> - </dependency> - <dependency> <groupId>org.roaringbitmap</groupId> <artifactId>RoaringBitmap</artifactId> </dependency> @@ -277,6 +297,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill-java</artifactId> + </dependency> + <dependency> <groupId>asm</groupId> <artifactId>asm</artifactId> <scope>test</scope> @@ -424,4 +448,5 @@ </resource> </resources> </build> + </project> http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/dev/change-version-to-2.10.sh ---------------------------------------------------------------------- diff --git a/dev/change-version-to-2.10.sh b/dev/change-version-to-2.10.sh new file mode 100755 index 0000000..ca48e6f --- /dev/null +++ b/dev/change-version-to-2.10.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +find -name 'pom.xml' -exec sed -i 's|\(artifactId.*\)_2.11|\1_2.10|g' {} \; http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/dev/change-version-to-2.11.sh ---------------------------------------------------------------------- diff --git a/dev/change-version-to-2.11.sh b/dev/change-version-to-2.11.sh new file mode 100755 index 0000000..07056b1 --- /dev/null +++ b/dev/change-version-to-2.11.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +find -name 'pom.xml' -exec sed -i 's|\(artifactId.*\)_2.10|\1_2.11|g' {} \; http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index bc32918..e80c637 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -34,24 +34,6 @@ <name>Spark Project Examples</name> <url>http://spark.apache.org/</url> - <profiles> - <profile> - <id>kinesis-asl</id> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>${commons.httpclient.version}</version> - </dependency> - </dependencies> - </profile> - </profiles> - <dependencies> <!-- Promote Guava to compile scope in this module so it's included while shading. --> <dependency> @@ -102,22 +84,17 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId> + <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId> + <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -152,10 +129,6 @@ <artifactId>jetty-server</artifactId> </dependency> <dependency> - <groupId>com.twitter</groupId> - <artifactId>algebird-core_${scala.binary.version}</artifactId> - <version>0.1.11</version> - </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> @@ -292,4 +265,122 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>kinesis-asl</id> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>${commons.httpclient.version}</version> + </dependency> + </dependencies> + </profile> + <profile> + <id>scala-2.10</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>algebird-core_${scala.binary.version}</artifactId> + <version>0.1.11</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + <source>scala-2.10/src/main/scala</source> + <source>scala-2.10/src/main/java</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-scala-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + <source>scala-2.10/src/test/scala</source> + <source>scala-2.10/src/test/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>scala-2.11</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <!-- Streaming Kafka and zeromq modules are disabled for now. --> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + <source>scala-2.11/src/main/scala</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-scala-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + <source>scala-2.11/src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java new file mode 100644 index 0000000..16ae9a3 --- /dev/null +++ b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Map; +import java.util.HashMap; +import java.util.regex.Pattern; + + +import scala.Tuple2; + +import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * + * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + + public static void main(String[] args) { + if (args.length < 4) { + System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); + // Create the context with a 1 second batch size + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); + + int numThreads = Integer.parseInt(args[3]); + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + String[] topics = args[2].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream(jssc, args[0], args[1], topicMap); + + JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) { + return tuple2._2(); + } + }); + + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream<String, Integer> wordCounts = words.mapToPair( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + wordCounts.print(); + jssc.start(); + jssc.awaitTermination(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala new file mode 100644 index 0000000..c9e1511 --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.util.Properties + +import kafka.producer._ + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.kafka._ +import org.apache.spark.SparkConf + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads> + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * Example: + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` + */ +object KafkaWordCount { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(zkQuorum, group, topics, numThreads) = args + val sparkConf = new SparkConf().setAppName("KafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("checkpoint") + + val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1L)) + .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + +// Produces some random words between 1 and 100. +object KafkaWordCountProducer { + + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + + "<messagesPerSec> <wordsPerMessage>") + System.exit(1) + } + + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args + + // Zookeper connection properties + val props = new Properties() + props.put("metadata.broker.list", brokers) + props.put("serializer.class", "kafka.serializer.StringEncoder") + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + + // Send some messages + while(true) { + val messages = (1 to messagesPerSec.toInt).map { messageNum => + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) + }.toArray + + producer.send(messages: _*) + Thread.sleep(100) + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala ---------------------------------------------------------------------- diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000..683752a --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird._ + +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.twitter._ + +// scalastyle:off +/** + * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute + * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. + * <br> + * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs, + * the example operates on Long IDs. Once the implementation supports other inputs (such as String), + * the same approach could be used for computing popular topics for example. + * <p> + * <p> + * <a href= + * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data + * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency + * of any given element, etc), that uses space sub-linear in the number of elements in the + * stream. Once elements are added to the CMS, the estimated count of an element can be computed, + * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total + * count. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + StreamingExamples.setStreamingLogLevels() + + // CMS parameters + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + // K highest frequency elements to take + val TOPK = 10 + + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") + val ssc = new StreamingContext(sparkConf, Seconds(10)) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) + + val users = stream.map(status => status.getUser.getId) + + val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) + var globalCMS = cms.zero + val mm = new MapMonoid[Long, Int]() + var globalExact = Map[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala ---------------------------------------------------------------------- diff --git a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000..62db5e6 --- /dev/null +++ b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.algebird.HyperLogLog._ + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.twitter._ +import org.apache.spark.SparkConf + +// scalastyle:off +/** + * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute + * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. + * <p> + * <p> + * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> + * blog post</a> and this + * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html"> + * blog post</a> + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for + * estimating the cardinality of a data stream, i.e. the number of unique elements. + * <p><p> + * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + + StreamingExamples.setStreamingLogLevels() + + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ + val BIT_SIZE = 12 + val filters = args + val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") + val ssc = new StreamingContext(sparkConf, Seconds(5)) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + val hll = new HyperLogLogMonoid(BIT_SIZE) + var globalHll = hll.zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 + ) * 100)) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java deleted file mode 100644 index 16ae9a3..0000000 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import java.util.Map; -import java.util.HashMap; -import java.util.regex.Pattern; - - -import scala.Tuple2; - -import com.google.common.collect.Lists; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * - * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> - * <zkQuorum> is a list of one or more zookeeper servers that make quorum - * <group> is the name of kafka consumer group - * <topics> is a list of one or more kafka topics to consume from - * <numThreads> is the number of threads the kafka consumer should use - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ - * zoo03 my-consumer-group topic1,topic2 1` - */ - -public final class JavaKafkaWordCount { - private static final Pattern SPACE = Pattern.compile(" "); - - private JavaKafkaWordCount() { - } - - public static void main(String[] args) { - if (args.length < 4) { - System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); - // Create the context with a 1 second batch size - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); - - int numThreads = Integer.parseInt(args[3]); - Map<String, Integer> topicMap = new HashMap<String, Integer>(); - String[] topics = args[2].split(","); - for (String topic: topics) { - topicMap.put(topic, numThreads); - } - - JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream(jssc, args[0], args[1], topicMap); - - JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { - @Override - public String call(Tuple2<String, String> tuple2) { - return tuple2._2(); - } - }); - - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterable<String> call(String x) { - return Lists.newArrayList(SPACE.split(x)); - } - }); - - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<String, Integer>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); - - wordCounts.print(); - jssc.start(); - jssc.awaitTermination(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala deleted file mode 100644 index c9e1511..0000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import java.util.Properties - -import kafka.producer._ - -import org.apache.spark.streaming._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.kafka._ -import org.apache.spark.SparkConf - -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads> - * <zkQuorum> is a list of one or more zookeeper servers that make quorum - * <group> is the name of kafka consumer group - * <topics> is a list of one or more kafka topics to consume from - * <numThreads> is the number of threads the kafka consumer should use - * - * Example: - * `$ bin/run-example \ - * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ - * my-consumer-group topic1,topic2 1` - */ -object KafkaWordCount { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(zkQuorum, group, topics, numThreads) = args - val sparkConf = new SparkConf().setAppName("KafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("checkpoint") - - val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1L)) - .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} - -// Produces some random words between 1 and 100. -object KafkaWordCountProducer { - - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + - "<messagesPerSec> <wordsPerMessage>") - System.exit(1) - } - - val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args - - // Zookeper connection properties - val props = new Properties() - props.put("metadata.broker.list", brokers) - props.put("serializer.class", "kafka.serializer.StringEncoder") - - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) - - // Send some messages - while(true) { - val messages = (1 to messagesPerSec.toInt).map { messageNum => - val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) - .mkString(" ") - - new KeyedMessage[String, String](topic, str) - }.toArray - - producer.send(messages: _*) - Thread.sleep(100) - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala deleted file mode 100644 index 683752a..0000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import com.twitter.algebird._ - -import org.apache.spark.SparkConf -import org.apache.spark.SparkContext._ -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.twitter._ - -// scalastyle:off -/** - * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute - * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. - * <br> - * <strong>Note</strong> that since Algebird's implementation currently only supports Long inputs, - * the example operates on Long IDs. Once the implementation supports other inputs (such as String), - * the same approach could be used for computing popular topics for example. - * <p> - * <p> - * <a href= - * "http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data - * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency - * of any given element, etc), that uses space sub-linear in the number of elements in the - * stream. Once elements are added to the CMS, the estimated count of an element can be computed, - * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total - * count. - * <p><p> - * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdCMS { - def main(args: Array[String]) { - StreamingExamples.setStreamingLogLevels() - - // CMS parameters - val DELTA = 1E-3 - val EPS = 0.01 - val SEED = 1 - val PERC = 0.001 - // K highest frequency elements to take - val TOPK = 10 - - val filters = args - val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS") - val ssc = new StreamingContext(sparkConf, Seconds(10)) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) - - val users = stream.map(status => status.getUser.getId) - - val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) - var globalCMS = cms.zero - val mm = new MapMonoid[Long, Int]() - var globalExact = Map[Long, Int]() - - val approxTopUsers = users.mapPartitions(ids => { - ids.map(id => cms.create(id)) - }).reduce(_ ++ _) - - val exactTopUsers = users.map(id => (id, 1)) - .reduceByKey((a, b) => a + b) - - approxTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - val partialTopK = partial.heavyHitters.map(id => - (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - globalCMS ++= partial - val globalTopK = globalCMS.heavyHitters.map(id => - (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, - partialTopK.mkString("[", ",", "]"))) - println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, - globalTopK.mkString("[", ",", "]"))) - } - }) - - exactTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partialMap = rdd.collect().toMap - val partialTopK = rdd.map( - {case (id, count) => (count, id)}) - .sortByKey(ascending = false).take(TOPK) - globalExact = mm.plus(globalExact.toMap, partialMap) - val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) - println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala deleted file mode 100644 index 62db5e6..0000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming - -import com.twitter.algebird.HyperLogLogMonoid -import com.twitter.algebird.HyperLogLog._ - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.twitter._ -import org.apache.spark.SparkConf - -// scalastyle:off -/** - * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute - * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. - * <p> - * <p> - * This <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * blog post</a> and this - * <a href= "http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html"> - * blog post</a> - * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for - * estimating the cardinality of a data stream, i.e. the number of unique elements. - * <p><p> - * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdHLL { - def main(args: Array[String]) { - - StreamingExamples.setStreamingLogLevels() - - /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ - val BIT_SIZE = 12 - val filters = args - val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL") - val ssc = new StreamingContext(sparkConf, Seconds(5)) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) - - val users = stream.map(status => status.getUser.getId) - - val hll = new HyperLogLogMonoid(BIT_SIZE) - var globalHll = hll.zero - var userSet: Set[Long] = Set() - - val approxUsers = users.mapPartitions(ids => { - ids.map(id => hll(id)) - }).reduce(_ + _) - - val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - - approxUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - globalHll += partial - println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) - } - }) - - exactUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - userSet ++= partial - println("Exact distinct users this batch: %d".format(partial.size)) - println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 - ) * 100)) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/external/mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 371f1f1..362a76e 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -53,11 +53,6 @@ <version>0.4.0</version> </dependency> <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-zeromq_${scala.binary.version}</artifactId> - <version>${akka.version}</version> - </dependency> - <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 88ef67c..1bd4970 100644 --- a/pom.xml +++ b/pom.xml @@ -97,30 +97,26 @@ <module>sql/catalyst</module> <module>sql/core</module> <module>sql/hive</module> - <module>repl</module> <module>assembly</module> <module>external/twitter</module> - <module>external/kafka</module> <module>external/flume</module> <module>external/flume-sink</module> - <module>external/zeromq</module> <module>external/mqtt</module> + <module>external/zeromq</module> <module>examples</module> + <module>repl</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - + <akka.group>org.spark-project.akka</akka.group> + <akka.version>2.3.4-spark</akka.version> <java.version>1.6</java.version> <sbt.project.name>spark</sbt.project.name> - <scala.version>2.10.4</scala.version> - <scala.binary.version>2.10</scala.binary.version> <scala.macros.version>2.0.1</scala.macros.version> <mesos.version>0.18.1</mesos.version> <mesos.classifier>shaded-protobuf</mesos.classifier> - <akka.group>org.spark-project.akka</akka.group> - <akka.version>2.3.4-spark</akka.version> <slf4j.version>1.7.5</slf4j.version> <log4j.version>1.2.17</log4j.version> <hadoop.version>1.0.4</hadoop.version> @@ -137,7 +133,7 @@ <parquet.version>1.6.0rc3</parquet.version> <jblas.version>1.2.3</jblas.version> <jetty.version>8.1.14.v20131031</jetty.version> - <chill.version>0.3.6</chill.version> + <chill.version>0.5.0</chill.version> <codahale.metrics.version>3.0.0</codahale.metrics.version> <avro.version>1.7.6</avro.version> <avro.mapred.classifier></avro.mapred.classifier> @@ -281,6 +277,41 @@ <dependencyManagement> <dependencies> <dependency> + <groupId>${jline.groupid}</groupId> + <artifactId>jline</artifactId> + <version>${jline.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill_${scala.binary.version}</artifactId> + <version>${chill.version}</version> + <exclusions> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-commons</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>chill-java</artifactId> + <version>${chill.version}</version> + <exclusions> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-commons</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>${jetty.version}</version> @@ -396,36 +427,6 @@ <version>${protobuf.version}</version> </dependency> <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill_${scala.binary.version}</artifactId> - <version>${chill.version}</version> - <exclusions> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm-commons</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>chill-java</artifactId> - <version>${chill.version}</version> - <exclusions> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm-commons</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>${akka.group}</groupId> <artifactId>akka-actor_${scala.binary.version}</artifactId> <version>${akka.version}</version> @@ -514,11 +515,6 @@ </dependency> <dependency> <groupId>org.scala-lang</groupId> - <artifactId>jline</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> @@ -1366,5 +1362,35 @@ <derby.version>10.10.1.1</derby.version> </properties> </profile> + + <profile> + <id>scala-2.10</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.10</scala.binary.version> + <jline.version>${scala.version}</jline.version> + <jline.groupid>org.scala-lang</jline.groupid> + </properties> + <modules> + <module>external/kafka</module> + </modules> + </profile> + + <profile> + <id>scala-2.11</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <properties> + <scala.version>2.11.2</scala.version> + <scala.binary.version>2.11</scala.binary.version> + <jline.version>2.12</jline.version> + <jline.groupid>jline</jline.groupid> + </properties> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 657e4b4..349cc27 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -91,13 +91,21 @@ object SparkBuild extends PomBuild { profiles } - override val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { + override val profiles = { + val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match { case None => backwardCompatibility case Some(v) => if (backwardCompatibility.nonEmpty) println("Note: We ignore environment variables, when use of profile is detected in " + "conjunction with environment variable.") v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq + } + if(profiles.exists(_.contains("scala"))) { + profiles + } else { + println("Enabled default scala profile") + profiles ++ Seq("scala-2.10") + } } Properties.envOrNone("SBT_MAVEN_PROPERTIES") match { @@ -354,7 +362,7 @@ object TestSettings { javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g" .split(" ").toSeq, javaOptions += "-Xmx3g", - + retrievePattern := "[conf]/[artifact](-[revision]).[ext]", // Show full stack trace and duration in test cases. testOptions in Test += Tests.Argument("-oDF"), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/project/project/SparkPluginBuild.scala ---------------------------------------------------------------------- diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala index 3ef2d54..8863f27 100644 --- a/project/project/SparkPluginBuild.scala +++ b/project/project/SparkPluginBuild.scala @@ -26,7 +26,7 @@ import sbt.Keys._ object SparkPluginDef extends Build { lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle, sbtPomReader) lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings) - lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git") + lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git#ignore_artifact_id") // There is actually no need to publish this artifact. def styleSettings = Defaults.defaultSettings ++ Seq ( http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/pom.xml ---------------------------------------------------------------------- diff --git a/repl/pom.xml b/repl/pom.xml index af528c8..bd688c8 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -39,6 +39,11 @@ <dependencies> <dependency> + <groupId>${jline.groupid}</groupId> + <artifactId>jline</artifactId> + <version>${jline.version}</version> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> @@ -76,11 +81,6 @@ <version>${scala.version}</version> </dependency> <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>jline</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> <groupId>org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> </dependency> @@ -124,4 +124,84 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>scala-2.10</id> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + <source>scala-2.10/src/main/scala</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-scala-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + <source>scala-2.10/src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>scala-2.11</id> + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-scala-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/scala</source> + <source>scala-2.11/src/main/scala</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-scala-test-sources</id> + <phase>generate-test-sources</phase> + <goals> + <goal>add-test-source</goal> + </goals> + <configuration> + <sources> + <source>src/test/scala</source> + <source>scala-2.11/src/test/scala</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala new file mode 100644 index 0000000..14b448d --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.collection.mutable.Set + +object Main { + private var _interp: SparkILoop = _ + + def interp = _interp + + def interp_=(i: SparkILoop) { _interp = i } + + def main(args: Array[String]) { + _interp = new SparkILoop + _interp.process(args) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala new file mode 100644 index 0000000..0581694 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import scala.tools.nsc.{Settings, CompilerCommand} +import scala.Predef._ + +/** + * Command class enabling Spark-specific command line options (provided by + * <i>org.apache.spark.repl.SparkRunnerSettings</i>). + */ +class SparkCommandLine(args: List[String], override val settings: Settings) + extends CompilerCommand(args, settings) { + + def this(args: List[String], error: String => Unit) { + this(args, new SparkRunnerSettings(error)) + } + + def this(args: List[String]) { + this(args, str => Console.println("Error: " + str)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala new file mode 100644 index 0000000..f8432c8 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala @@ -0,0 +1,114 @@ +// scalastyle:off + +/* NSC -- new Scala compiler + * Copyright 2005-2013 LAMP/EPFL + * @author Paul Phillips + */ + +package org.apache.spark.repl + +import scala.tools.nsc._ +import scala.tools.nsc.interpreter._ + +import scala.reflect.internal.util.BatchSourceFile +import scala.tools.nsc.ast.parser.Tokens.EOF + +import org.apache.spark.Logging + +trait SparkExprTyper extends Logging { + val repl: SparkIMain + + import repl._ + import global.{ reporter => _, Import => _, _ } + import definitions._ + import syntaxAnalyzer.{ UnitParser, UnitScanner, token2name } + import naming.freshInternalVarName + + object codeParser extends { val global: repl.global.type = repl.global } with CodeHandlers[Tree] { + def applyRule[T](code: String, rule: UnitParser => T): T = { + reporter.reset() + val scanner = newUnitParser(code) + val result = rule(scanner) + + if (!reporter.hasErrors) + scanner.accept(EOF) + + result + } + + def defns(code: String) = stmts(code) collect { case x: DefTree => x } + def expr(code: String) = applyRule(code, _.expr()) + def stmts(code: String) = applyRule(code, _.templateStats()) + def stmt(code: String) = stmts(code).last // guaranteed nonempty + } + + /** Parse a line into a sequence of trees. Returns None if the input is incomplete. */ + def parse(line: String): Option[List[Tree]] = debugging(s"""parse("$line")""") { + var isIncomplete = false + reporter.withIncompleteHandler((_, _) => isIncomplete = true) { + val trees = codeParser.stmts(line) + if (reporter.hasErrors) { + Some(Nil) + } else if (isIncomplete) { + None + } else { + Some(trees) + } + } + } + // def parsesAsExpr(line: String) = { + // import codeParser._ + // (opt expr line).isDefined + // } + + def symbolOfLine(code: String): Symbol = { + def asExpr(): Symbol = { + val name = freshInternalVarName() + // Typing it with a lazy val would give us the right type, but runs + // into compiler bugs with things like existentials, so we compile it + // behind a def and strip the NullaryMethodType which wraps the expr. + val line = "def " + name + " = {\n" + code + "\n}" + + interpretSynthetic(line) match { + case IR.Success => + val sym0 = symbolOfTerm(name) + // drop NullaryMethodType + val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType) + if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym + case _ => NoSymbol + } + } + def asDefn(): Symbol = { + val old = repl.definedSymbolList.toSet + + interpretSynthetic(code) match { + case IR.Success => + repl.definedSymbolList filterNot old match { + case Nil => NoSymbol + case sym :: Nil => sym + case syms => NoSymbol.newOverloaded(NoPrefix, syms) + } + case _ => NoSymbol + } + } + beQuietDuring(asExpr()) orElse beQuietDuring(asDefn()) + } + + private var typeOfExpressionDepth = 0 + def typeOfExpression(expr: String, silent: Boolean = true): Type = { + if (typeOfExpressionDepth > 2) { + logDebug("Terminating typeOfExpression recursion for expression: " + expr) + return NoType + } + typeOfExpressionDepth += 1 + // Don't presently have a good way to suppress undesirable success output + // while letting errors through, so it is first trying it silently: if there + // is an error, and errors are desired, then it re-evaluates non-silently + // to induce the error message. + try beSilentDuring(symbolOfLine(expr).tpe) match { + case NoType if !silent => symbolOfLine(expr).tpe // generate error + case tpe => tpe + } + finally typeOfExpressionDepth -= 1 + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala new file mode 100644 index 0000000..5340951 --- /dev/null +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package scala.tools.nsc + +object SparkHelper { + def explicitParentLoader(settings: Settings) = settings.explicitParentLoader +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
