Scala 2.11 support with repl and all build changes.

Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4af9de7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4af9de7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4af9de7d

Branch: refs/heads/scala-2.11-prashant
Commit: 4af9de7dc72a809e10f4a287b509dec4ca12ae53
Parents: 48a19a6
Author: Prashant Sharma <[email protected]>
Authored: Mon Oct 20 17:43:38 2014 +0530
Committer: Prashant Sharma <[email protected]>
Committed: Fri Nov 7 11:22:47 2014 +0530

----------------------------------------------------------------------
 .rat-excludes                                   |    1 +
 assembly/pom.xml                                |    8 +-
 bin/compute-classpath.sh                        |    8 +-
 conf/spark-env.sh.template                      |    3 +
 core/pom.xml                                    |   41 +-
 dev/change-version-to-2.10.sh                   |   20 +
 dev/change-version-to-2.11.sh                   |   20 +
 examples/pom.xml                                |  149 +-
 .../examples/streaming/JavaKafkaWordCount.java  |  113 ++
 .../examples/streaming/KafkaWordCount.scala     |  102 ++
 .../examples/streaming/TwitterAlgebirdCMS.scala |  114 ++
 .../examples/streaming/TwitterAlgebirdHLL.scala |   92 ++
 .../examples/streaming/JavaKafkaWordCount.java  |  113 --
 .../examples/streaming/KafkaWordCount.scala     |  102 --
 .../examples/streaming/TwitterAlgebirdCMS.scala |  114 --
 .../examples/streaming/TwitterAlgebirdHLL.scala |   92 --
 external/mqtt/pom.xml                           |    5 -
 pom.xml                                         |  114 +-
 project/SparkBuild.scala                        |   12 +-
 project/project/SparkPluginBuild.scala          |    2 +-
 repl/pom.xml                                    |   90 +-
 .../main/scala/org/apache/spark/repl/Main.scala |   33 +
 .../apache/spark/repl/SparkCommandLine.scala    |   37 +
 .../org/apache/spark/repl/SparkExprTyper.scala  |  114 ++
 .../org/apache/spark/repl/SparkHelper.scala     |   22 +
 .../org/apache/spark/repl/SparkILoop.scala      | 1091 +++++++++++++
 .../org/apache/spark/repl/SparkILoopInit.scala  |  147 ++
 .../org/apache/spark/repl/SparkIMain.scala      | 1445 ++++++++++++++++++
 .../org/apache/spark/repl/SparkImports.scala    |  238 +++
 .../spark/repl/SparkJLineCompletion.scala       |  377 +++++
 .../apache/spark/repl/SparkJLineReader.scala    |   90 ++
 .../apache/spark/repl/SparkMemberHandlers.scala |  232 +++
 .../apache/spark/repl/SparkRunnerSettings.scala |   32 +
 .../scala/org/apache/spark/repl/ReplSuite.scala |  318 ++++
 .../main/scala/org/apache/spark/repl/Main.scala |   85 ++
 .../org/apache/spark/repl/SparkExprTyper.scala  |   86 ++
 .../org/apache/spark/repl/SparkILoop.scala      |  966 ++++++++++++
 .../org/apache/spark/repl/SparkIMain.scala      | 1319 ++++++++++++++++
 .../org/apache/spark/repl/SparkImports.scala    |  201 +++
 .../spark/repl/SparkJLineCompletion.scala       |  350 +++++
 .../apache/spark/repl/SparkMemberHandlers.scala |  221 +++
 .../apache/spark/repl/SparkReplReporter.scala   |   53 +
 .../scala/org/apache/spark/repl/ReplSuite.scala |  326 ++++
 .../main/scala/org/apache/spark/repl/Main.scala |   33 -
 .../apache/spark/repl/SparkCommandLine.scala    |   37 -
 .../org/apache/spark/repl/SparkExprTyper.scala  |  114 --
 .../org/apache/spark/repl/SparkHelper.scala     |   22 -
 .../org/apache/spark/repl/SparkILoop.scala      | 1091 -------------
 .../org/apache/spark/repl/SparkILoopInit.scala  |  147 --
 .../org/apache/spark/repl/SparkIMain.scala      | 1445 ------------------
 .../org/apache/spark/repl/SparkImports.scala    |  238 ---
 .../spark/repl/SparkJLineCompletion.scala       |  377 -----
 .../apache/spark/repl/SparkJLineReader.scala    |   90 --
 .../apache/spark/repl/SparkMemberHandlers.scala |  232 ---
 .../apache/spark/repl/SparkRunnerSettings.scala |   32 -
 .../scala/org/apache/spark/repl/ReplSuite.scala |  318 ----
 sql/catalyst/pom.xml                            |   29 +-
 57 files changed, 8602 insertions(+), 4701 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 20e3372..d8bee1f 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -44,6 +44,7 @@ SparkImports.scala
 SparkJLineCompletion.scala
 SparkJLineReader.scala
 SparkMemberHandlers.scala
+SparkReplReporter.scala
 sbt
 sbt-launch-lib.bash
 plugins.sbt

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 31a01e4..e592220 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -66,22 +66,22 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-repl_${scala.binary.version}</artifactId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <artifactId>spark-graphx_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-graphx_${scala.binary.version}</artifactId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <artifactId>spark-repl_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/bin/compute-classpath.sh
----------------------------------------------------------------------
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 905bbaf..993d260 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -20,7 +20,7 @@
 # This script computes Spark's classpath and prints it to stdout; it's used by 
both the "run"
 # script and the ExecutorRunner in standalone cluster mode.
 
-SCALA_VERSION=2.10
+SCALA_VERSION=${SCALA_VERSION:-"2.10"}
 
 # Figure out where Spark is installed
 FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
@@ -121,6 +121,9 @@ if [ -n "$datanucleus_jars" ]; then
   fi
 fi
 
+test_jars=$(find "$FWDIR"/lib_managed/test \( -name '*jar' -a -type f \) 
2>/dev/null | \
+    tr "\n" : | sed s/:$//g)
+
 # Add test classes if we're running from SBT or Maven with SPARK_TESTING set 
to 1
 if [[ $SPARK_TESTING == 1 ]]; then
   CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes"
@@ -132,6 +135,9 @@ if [[ $SPARK_TESTING == 1 ]]; then
   
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/test-classes"
   
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/test-classes"
   
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/test-classes"
+  if [[ $SCALA_VERSION == "2.11" ]]; then
+      CLASSPATH="$CLASSPATH:$test_jars"
+  fi
 fi
 
 # Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail !

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/conf/spark-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index f8ffbf6..6a5622e 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -3,6 +3,9 @@
 # This file is sourced when running various Spark programs.
 # Copy it as spark-env.sh and edit that to configure Spark for your site.
 
+# Uncomment this if you plan to use scala 2.11
+# SCALA_VERSION=2.11
+
 # Options read when launching programs locally with 
 # ./bin/run-example or ./bin/spark-submit
 # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 41296e0..624aa96 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -35,6 +35,34 @@
   <url>http://spark.apache.org/</url>
   <dependencies>
     <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>chill_${scala.binary.version}</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.ow2.asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.ow2.asm</groupId>
+          <artifactId>asm-commons</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>chill-java</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.ow2.asm</groupId>
+          <artifactId>asm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.ow2.asm</groupId>
+          <artifactId>asm-commons</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <exclusions>
@@ -133,14 +161,6 @@
       <artifactId>lz4</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>chill_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>chill-java</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.roaringbitmap</groupId>
       <artifactId>RoaringBitmap</artifactId>
     </dependency>
@@ -277,6 +297,10 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>chill-java</artifactId>
+    </dependency>
+    <dependency>
       <groupId>asm</groupId>
       <artifactId>asm</artifactId>
       <scope>test</scope>
@@ -424,4 +448,5 @@
       </resource>
     </resources>
   </build>
+
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/dev/change-version-to-2.10.sh
----------------------------------------------------------------------
diff --git a/dev/change-version-to-2.10.sh b/dev/change-version-to-2.10.sh
new file mode 100755
index 0000000..ca48e6f
--- /dev/null
+++ b/dev/change-version-to-2.10.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+find -name 'pom.xml' -exec sed -i 's|\(artifactId.*\)_2.11|\1_2.10|g' {}  \;

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/dev/change-version-to-2.11.sh
----------------------------------------------------------------------
diff --git a/dev/change-version-to-2.11.sh b/dev/change-version-to-2.11.sh
new file mode 100755
index 0000000..07056b1
--- /dev/null
+++ b/dev/change-version-to-2.11.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+find -name 'pom.xml' -exec sed -i 's|\(artifactId.*\)_2.10|\1_2.11|g' {}  \;

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index bc32918..e80c637 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,24 +34,6 @@
   <name>Spark Project Examples</name>
   <url>http://spark.apache.org/</url>
 
-  <profiles>
-    <profile>
-      <id>kinesis-asl</id>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.spark</groupId>
-          
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
-          <version>${project.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.httpcomponents</groupId>
-          <artifactId>httpclient</artifactId>
-          <version>${commons.httpclient.version}</version>
-        </dependency>
-      </dependencies>
-    </profile>
-  </profiles>
-
   <dependencies>
     <!-- Promote Guava to compile scope in this module so it's included while 
shading. -->
     <dependency>
@@ -102,22 +84,17 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>
+      <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
+      <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
@@ -152,10 +129,6 @@
       <artifactId>jetty-server</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>algebird-core_${scala.binary.version}</artifactId>
-      <version>0.1.11</version>
-    </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math3</artifactId>
@@ -292,4 +265,122 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>kinesis-asl</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+          <version>${commons.httpclient.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>scala-2.10</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.spark</groupId>
+          
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.twitter</groupId>
+          <artifactId>algebird-core_${scala.binary.version}</artifactId>
+          <version>0.1.11</version>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-scala-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/scala</source>
+                    <source>scala-2.10/src/main/scala</source>
+                    <source>scala-2.10/src/main/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-scala-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/scala</source>
+                    <source>scala-2.10/src/test/scala</source>
+                    <source>scala-2.10/src/test/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>scala-2.11</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <dependencies>
+        <!-- Streaming Kafka and zeromq modules are disabled for now. -->
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-scala-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/scala</source>
+                    <source>scala-2.11/src/main/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-scala-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/scala</source>
+                    <source>scala-2.11/src/test/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 
b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
new file mode 100644
index 0000000..16ae9a3
--- /dev/null
+++ 
b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ *
+ * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ *   <group> is the name of kafka consumer group
+ *   <topics> is a list of one or more kafka topics to consume from
+ *   <numThreads> is the number of threads the kafka consumer should use
+ *
+ * To run this example:
+ *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount 
zoo01,zoo02, \
+ *    zoo03 my-consumer-group topic1,topic2 1`
+ */
+
+public final class JavaKafkaWordCount {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  private JavaKafkaWordCount() {
+  }
+
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> 
<topics> <numThreads>");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
+    // Create the context with a 1 second batch size
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(2000));
+
+    int numThreads = Integer.parseInt(args[3]);
+    Map<String, Integer> topicMap = new HashMap<String, Integer>();
+    String[] topics = args[2].split(",");
+    for (String topic: topics) {
+      topicMap.put(topic, numThreads);
+    }
+
+    JavaPairReceiverInputDStream<String, String> messages =
+            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
+
+    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
+      @Override
+      public String call(Tuple2<String, String> tuple2) {
+        return tuple2._2();
+      }
+    });
+
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override
+        public Tuple2<String, Integer> call(String s) {
+          return new Tuple2<String, Integer>(s, 1);
+        }
+      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer i1, Integer i2) {
+          return i1 + i2;
+        }
+      });
+
+    wordCounts.print();
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
new file mode 100644
index 0000000..c9e1511
--- /dev/null
+++ 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.util.Properties
+
+import kafka.producer._
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ *   <group> is the name of kafka consumer group
+ *   <topics> is a list of one or more kafka topics to consume from
+ *   <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
+ *      my-consumer-group topic1,topic2 1`
+ */
+object KafkaWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> 
<numThreads>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(zkQuorum, group, topics, numThreads) = args
+    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
+    val ssc =  new StreamingContext(sparkConf, Seconds(2))
+    ssc.checkpoint("checkpoint")
+
+    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
+    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
topicMap).map(_._2)
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1L))
+      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
+    wordCounts.print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> 
<topic> " +
+        "<messagesPerSec> <wordsPerMessage>")
+      System.exit(1)
+    }
+
+    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
+
+    // Zookeper connection properties
+    val props = new Properties()
+    props.put("metadata.broker.list", brokers)
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+
+    val config = new ProducerConfig(props)
+    val producer = new Producer[String, String](config)
+
+    // Send some messages
+    while(true) {
+      val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+        val str = (1 to wordsPerMessage.toInt).map(x => 
scala.util.Random.nextInt(10).toString)
+          .mkString(" ")
+
+        new KeyedMessage[String, String](topic, str)
+      }.toArray
+
+      producer.send(messages: _*)
+      Thread.sleep(100)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
new file mode 100644
index 0000000..683752a
--- /dev/null
+++ 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.twitter._
+
+// scalastyle:off
+/**
+ * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird 
library, to compute
+ * windowed and global Top-K estimates of user IDs occurring in a Twitter 
stream.
+ * <br>
+ *   <strong>Note</strong> that since Algebird's implementation currently only 
supports Long inputs,
+ *   the example operates on Long IDs. Once the implementation supports other 
inputs (such as String),
+ *   the same approach could be used for computing popular topics for example.
+ * <p>
+ * <p>
+ *   <a href=
+ *   
"http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
+ *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The 
CMS is a data
+ *   structure for approximate frequency estimation in data streams (e.g. 
Top-K elements, frequency
+ *   of any given element, etc), that uses space sub-linear in the number of 
elements in the
+ *   stream. Once elements are added to the CMS, the estimated count of an 
element can be computed,
+ *   as well as "heavy-hitters" that occur more than a threshold percentage of 
the overall total
+ *   count.
+ * <p><p>
+ *   Algebird's implementation is a monoid, so we can succinctly merge two CMS 
instances in the
+ *   reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdCMS {
+  def main(args: Array[String]) {
+    StreamingExamples.setStreamingLogLevels()
+
+    // CMS parameters
+    val DELTA = 1E-3
+    val EPS = 0.01
+    val SEED = 1
+    val PERC = 0.001
+    // K highest frequency elements to take
+    val TOPK = 10
+
+    val filters = args
+    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
+    val ssc = new StreamingContext(sparkConf, Seconds(10))
+    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER_2)
+
+    val users = stream.map(status => status.getUser.getId)
+
+    val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
+    var globalCMS = cms.zero
+    val mm = new MapMonoid[Long, Int]()
+    var globalExact = Map[Long, Int]()
+
+    val approxTopUsers = users.mapPartitions(ids => {
+      ids.map(id => cms.create(id))
+    }).reduce(_ ++ _)
+
+    val exactTopUsers = users.map(id => (id, 1))
+      .reduceByKey((a, b) => a + b)
+
+    approxTopUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        val partialTopK = partial.heavyHitters.map(id =>
+          (id, 
partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        globalCMS ++= partial
+        val globalTopK = globalCMS.heavyHitters.map(id =>
+          (id, 
globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        println("Approx heavy hitters at %2.2f%% threshold this batch: 
%s".format(PERC,
+          partialTopK.mkString("[", ",", "]")))
+        println("Approx heavy hitters at %2.2f%% threshold overall: 
%s".format(PERC,
+          globalTopK.mkString("[", ",", "]")))
+      }
+    })
+
+    exactTopUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partialMap = rdd.collect().toMap
+        val partialTopK = rdd.map(
+          {case (id, count) => (count, id)})
+          .sortByKey(ascending = false).take(TOPK)
+        globalExact = mm.plus(globalExact.toMap, partialMap)
+        val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
+        println("Exact heavy hitters this batch: 
%s".format(partialTopK.mkString("[", ",", "]")))
+        println("Exact heavy hitters overall: 
%s".format(globalTopK.mkString("[", ",", "]")))
+      }
+    })
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
new file mode 100644
index 0000000..62db5e6
--- /dev/null
+++ 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.twitter._
+import org.apache.spark.SparkConf
+
+// scalastyle:off
+/**
+ * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird 
library, to compute
+ * a windowed and global estimate of the unique user IDs occurring in a 
Twitter stream.
+ * <p>
+ * <p>
+ *   This <a 
href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
+ *   blog post</a> and this
+ *   <a href= 
"http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html";>
+ *     blog post</a>
+ *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient 
datastructure for
+ *   estimating the cardinality of a data stream, i.e. the number of unique 
elements.
+ * <p><p>
+ *   Algebird's implementation is a monoid, so we can succinctly merge two HLL 
instances in the
+ *   reduce operation.
+ */
+// scalastyle:on
+object TwitterAlgebirdHLL {
+  def main(args: Array[String]) {
+
+    StreamingExamples.setStreamingLogLevels()
+
+    /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
+    val BIT_SIZE = 12
+    val filters = args
+    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
+    val ssc = new StreamingContext(sparkConf, Seconds(5))
+    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER)
+
+    val users = stream.map(status => status.getUser.getId)
+
+    val hll = new HyperLogLogMonoid(BIT_SIZE)
+    var globalHll = hll.zero
+    var userSet: Set[Long] = Set()
+
+    val approxUsers = users.mapPartitions(ids => {
+      ids.map(id => hll(id))
+    }).reduce(_ + _)
+
+    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
+
+    approxUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        globalHll += partial
+        println("Approx distinct users this batch: 
%d".format(partial.estimatedSize.toInt))
+        println("Approx distinct users overall: 
%d".format(globalHll.estimatedSize.toInt))
+      }
+    })
+
+    exactUsers.foreachRDD(rdd => {
+      if (rdd.count() != 0) {
+        val partial = rdd.first()
+        userSet ++= partial
+        println("Exact distinct users this batch: %d".format(partial.size))
+        println("Exact distinct users overall: %d".format(userSet.size))
+        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / 
userSet.size.toDouble) - 1
+          ) * 100))
+      }
+    })
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
deleted file mode 100644
index 16ae9a3..0000000
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.regex.Pattern;
-
-
-import scala.Tuple2;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- *
- * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
- *   <group> is the name of kafka consumer group
- *   <topics> is a list of one or more kafka topics to consume from
- *   <numThreads> is the number of threads the kafka consumer should use
- *
- * To run this example:
- *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount 
zoo01,zoo02, \
- *    zoo03 my-consumer-group topic1,topic2 1`
- */
-
-public final class JavaKafkaWordCount {
-  private static final Pattern SPACE = Pattern.compile(" ");
-
-  private JavaKafkaWordCount() {
-  }
-
-  public static void main(String[] args) {
-    if (args.length < 4) {
-      System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> 
<topics> <numThreads>");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
-    // Create the context with a 1 second batch size
-    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(2000));
-
-    int numThreads = Integer.parseInt(args[3]);
-    Map<String, Integer> topicMap = new HashMap<String, Integer>();
-    String[] topics = args[2].split(",");
-    for (String topic: topics) {
-      topicMap.put(topic, numThreads);
-    }
-
-    JavaPairReceiverInputDStream<String, String> messages =
-            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
-
-    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
-      @Override
-      public String call(Tuple2<String, String> tuple2) {
-        return tuple2._2();
-      }
-    });
-
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
-      }
-    });
-
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<String, Integer>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
-
-    wordCounts.print();
-    jssc.start();
-    jssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
deleted file mode 100644
index c9e1511..0000000
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import java.util.Properties
-
-import kafka.producer._
-
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.kafka._
-import org.apache.spark.SparkConf
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
- *   <group> is the name of kafka consumer group
- *   <topics> is a list of one or more kafka topics to consume from
- *   <numThreads> is the number of threads the kafka consumer should use
- *
- * Example:
- *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
- *      my-consumer-group topic1,topic2 1`
- */
-object KafkaWordCount {
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> 
<numThreads>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(zkQuorum, group, topics, numThreads) = args
-    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
-    val ssc =  new StreamingContext(sparkConf, Seconds(2))
-    ssc.checkpoint("checkpoint")
-
-    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
-    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
topicMap).map(_._2)
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1L))
-      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
-    wordCounts.print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-
-// Produces some random words between 1 and 100.
-object KafkaWordCountProducer {
-
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> 
<topic> " +
-        "<messagesPerSec> <wordsPerMessage>")
-      System.exit(1)
-    }
-
-    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
-
-    // Zookeper connection properties
-    val props = new Properties()
-    props.put("metadata.broker.list", brokers)
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-
-    val config = new ProducerConfig(props)
-    val producer = new Producer[String, String](config)
-
-    // Send some messages
-    while(true) {
-      val messages = (1 to messagesPerSec.toInt).map { messageNum =>
-        val str = (1 to wordsPerMessage.toInt).map(x => 
scala.util.Random.nextInt(10).toString)
-          .mkString(" ")
-
-        new KeyedMessage[String, String](topic, str)
-      }.toArray
-
-      producer.send(messages: _*)
-      Thread.sleep(100)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
deleted file mode 100644
index 683752a..0000000
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird._
-
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.twitter._
-
-// scalastyle:off
-/**
- * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird 
library, to compute
- * windowed and global Top-K estimates of user IDs occurring in a Twitter 
stream.
- * <br>
- *   <strong>Note</strong> that since Algebird's implementation currently only 
supports Long inputs,
- *   the example operates on Long IDs. Once the implementation supports other 
inputs (such as String),
- *   the same approach could be used for computing popular topics for example.
- * <p>
- * <p>
- *   <a href=
- *   
"http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
- *   This blog post</a> has a good overview of the Count-Min Sketch (CMS). The 
CMS is a data
- *   structure for approximate frequency estimation in data streams (e.g. 
Top-K elements, frequency
- *   of any given element, etc), that uses space sub-linear in the number of 
elements in the
- *   stream. Once elements are added to the CMS, the estimated count of an 
element can be computed,
- *   as well as "heavy-hitters" that occur more than a threshold percentage of 
the overall total
- *   count.
- * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two CMS 
instances in the
- *   reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdCMS {
-  def main(args: Array[String]) {
-    StreamingExamples.setStreamingLogLevels()
-
-    // CMS parameters
-    val DELTA = 1E-3
-    val EPS = 0.01
-    val SEED = 1
-    val PERC = 0.001
-    // K highest frequency elements to take
-    val TOPK = 10
-
-    val filters = args
-    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdCMS")
-    val ssc = new StreamingContext(sparkConf, Seconds(10))
-    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER_2)
-
-    val users = stream.map(status => status.getUser.getId)
-
-    val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
-    var globalCMS = cms.zero
-    val mm = new MapMonoid[Long, Int]()
-    var globalExact = Map[Long, Int]()
-
-    val approxTopUsers = users.mapPartitions(ids => {
-      ids.map(id => cms.create(id))
-    }).reduce(_ ++ _)
-
-    val exactTopUsers = users.map(id => (id, 1))
-      .reduceByKey((a, b) => a + b)
-
-    approxTopUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        val partialTopK = partial.heavyHitters.map(id =>
-          (id, 
partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        globalCMS ++= partial
-        val globalTopK = globalCMS.heavyHitters.map(id =>
-          (id, 
globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        println("Approx heavy hitters at %2.2f%% threshold this batch: 
%s".format(PERC,
-          partialTopK.mkString("[", ",", "]")))
-        println("Approx heavy hitters at %2.2f%% threshold overall: 
%s".format(PERC,
-          globalTopK.mkString("[", ",", "]")))
-      }
-    })
-
-    exactTopUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partialMap = rdd.collect().toMap
-        val partialTopK = rdd.map(
-          {case (id, count) => (count, id)})
-          .sortByKey(ascending = false).take(TOPK)
-        globalExact = mm.plus(globalExact.toMap, partialMap)
-        val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK)
-        println("Exact heavy hitters this batch: 
%s".format(partialTopK.mkString("[", ",", "]")))
-        println("Exact heavy hitters overall: 
%s".format(globalTopK.mkString("[", ",", "]")))
-      }
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
deleted file mode 100644
index 62db5e6..0000000
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import com.twitter.algebird.HyperLogLogMonoid
-import com.twitter.algebird.HyperLogLog._
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.twitter._
-import org.apache.spark.SparkConf
-
-// scalastyle:off
-/**
- * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird 
library, to compute
- * a windowed and global estimate of the unique user IDs occurring in a 
Twitter stream.
- * <p>
- * <p>
- *   This <a 
href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/";>
- *   blog post</a> and this
- *   <a href= 
"http://highscalability.com/blog/2012/4/5/big-data-counting-how-to-count-a-billion-distinct-objects-us.html";>
- *     blog post</a>
- *   have good overviews of HyperLogLog (HLL). HLL is a memory-efficient 
datastructure for
- *   estimating the cardinality of a data stream, i.e. the number of unique 
elements.
- * <p><p>
- *   Algebird's implementation is a monoid, so we can succinctly merge two HLL 
instances in the
- *   reduce operation.
- */
-// scalastyle:on
-object TwitterAlgebirdHLL {
-  def main(args: Array[String]) {
-
-    StreamingExamples.setStreamingLogLevels()
-
-    /** Bit size parameter for HyperLogLog, trades off accuracy vs size */
-    val BIT_SIZE = 12
-    val filters = args
-    val sparkConf = new SparkConf().setAppName("TwitterAlgebirdHLL")
-    val ssc = new StreamingContext(sparkConf, Seconds(5))
-    val stream = TwitterUtils.createStream(ssc, None, filters, 
StorageLevel.MEMORY_ONLY_SER)
-
-    val users = stream.map(status => status.getUser.getId)
-
-    val hll = new HyperLogLogMonoid(BIT_SIZE)
-    var globalHll = hll.zero
-    var userSet: Set[Long] = Set()
-
-    val approxUsers = users.mapPartitions(ids => {
-      ids.map(id => hll(id))
-    }).reduce(_ + _)
-
-    val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
-
-    approxUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        globalHll += partial
-        println("Approx distinct users this batch: 
%d".format(partial.estimatedSize.toInt))
-        println("Approx distinct users overall: 
%d".format(globalHll.estimatedSize.toInt))
-      }
-    })
-
-    exactUsers.foreachRDD(rdd => {
-      if (rdd.count() != 0) {
-        val partial = rdd.first()
-        userSet ++= partial
-        println("Exact distinct users this batch: %d".format(partial.size))
-        println("Exact distinct users overall: %d".format(userSet.size))
-        println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / 
userSet.size.toDouble) - 1
-          ) * 100))
-      }
-    })
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/external/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 371f1f1..362a76e 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -53,11 +53,6 @@
        <version>0.4.0</version>
     </dependency>
     <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 88ef67c..1bd4970 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,30 +97,26 @@
     <module>sql/catalyst</module>
     <module>sql/core</module>
     <module>sql/hive</module>
-    <module>repl</module>
     <module>assembly</module>
     <module>external/twitter</module>
-    <module>external/kafka</module>
     <module>external/flume</module>
     <module>external/flume-sink</module>
-    <module>external/zeromq</module>
     <module>external/mqtt</module>
+    <module>external/zeromq</module>
     <module>examples</module>
+    <module>repl</module>
   </modules>
 
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-
+    <akka.group>org.spark-project.akka</akka.group>
+    <akka.version>2.3.4-spark</akka.version>
     <java.version>1.6</java.version>
     <sbt.project.name>spark</sbt.project.name>
-    <scala.version>2.10.4</scala.version>
-    <scala.binary.version>2.10</scala.binary.version>
     <scala.macros.version>2.0.1</scala.macros.version>
     <mesos.version>0.18.1</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
-    <akka.group>org.spark-project.akka</akka.group>
-    <akka.version>2.3.4-spark</akka.version>
     <slf4j.version>1.7.5</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
     <hadoop.version>1.0.4</hadoop.version>
@@ -137,7 +133,7 @@
     <parquet.version>1.6.0rc3</parquet.version>
     <jblas.version>1.2.3</jblas.version>
     <jetty.version>8.1.14.v20131031</jetty.version>
-    <chill.version>0.3.6</chill.version>
+    <chill.version>0.5.0</chill.version>
     <codahale.metrics.version>3.0.0</codahale.metrics.version>
     <avro.version>1.7.6</avro.version>
     <avro.mapred.classifier></avro.mapred.classifier>
@@ -281,6 +277,41 @@
   <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>${jline.groupid}</groupId>
+        <artifactId>jline</artifactId>
+        <version>${jline.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.twitter</groupId>
+        <artifactId>chill_${scala.binary.version}</artifactId>
+        <version>${chill.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm-commons</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>com.twitter</groupId>
+        <artifactId>chill-java</artifactId>
+        <version>${chill.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.ow2.asm</groupId>
+            <artifactId>asm-commons</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
         <groupId>org.eclipse.jetty</groupId>
         <artifactId>jetty-util</artifactId>
         <version>${jetty.version}</version>
@@ -396,36 +427,6 @@
         <version>${protobuf.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.twitter</groupId>
-        <artifactId>chill_${scala.binary.version}</artifactId>
-        <version>${chill.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-commons</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>com.twitter</groupId>
-        <artifactId>chill-java</artifactId>
-        <version>${chill.version}</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-commons</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
         <groupId>${akka.group}</groupId>
         <artifactId>akka-actor_${scala.binary.version}</artifactId>
         <version>${akka.version}</version>
@@ -514,11 +515,6 @@
       </dependency>
       <dependency>
         <groupId>org.scala-lang</groupId>
-        <artifactId>jline</artifactId>
-        <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.scala-lang</groupId>
         <artifactId>scala-library</artifactId>
         <version>${scala.version}</version>
       </dependency>
@@ -1366,5 +1362,35 @@
         <derby.version>10.10.1.1</derby.version>
       </properties>
     </profile>
+
+    <profile>
+      <id>scala-2.10</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <scala.version>2.10.4</scala.version>
+        <scala.binary.version>2.10</scala.binary.version>
+        <jline.version>${scala.version}</jline.version>
+        <jline.groupid>org.scala-lang</jline.groupid>
+      </properties>
+      <modules>
+        <module>external/kafka</module>
+      </modules>
+    </profile>
+
+    <profile>
+      <id>scala-2.11</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <properties>
+        <scala.version>2.11.2</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <jline.version>2.12</jline.version>
+        <jline.groupid>jline</jline.groupid>
+      </properties>
+    </profile>
+
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 657e4b4..349cc27 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -91,13 +91,21 @@ object SparkBuild extends PomBuild {
     profiles
   }
 
-  override val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match {
+  override val profiles = {
+    val profiles = Properties.envOrNone("SBT_MAVEN_PROFILES") match {
     case None => backwardCompatibility
     case Some(v) =>
       if (backwardCompatibility.nonEmpty)
         println("Note: We ignore environment variables, when use of profile is 
detected in " +
           "conjunction with environment variable.")
       v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", 
"")).toSeq
+    }
+    if(profiles.exists(_.contains("scala"))) {
+      profiles
+    } else {
+      println("Enabled default scala profile")
+      profiles ++ Seq("scala-2.10")
+    }
   }
 
   Properties.envOrNone("SBT_MAVEN_PROPERTIES") match {
@@ -354,7 +362,7 @@ object TestSettings {
     javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m 
-XX:MaxPermSize=1g"
       .split(" ").toSeq,
     javaOptions += "-Xmx3g",
-
+    retrievePattern := "[conf]/[artifact](-[revision]).[ext]",
     // Show full stack trace and duration in test cases.
     testOptions in Test += Tests.Argument("-oDF"),
     testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/project/project/SparkPluginBuild.scala
----------------------------------------------------------------------
diff --git a/project/project/SparkPluginBuild.scala 
b/project/project/SparkPluginBuild.scala
index 3ef2d54..8863f27 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -26,7 +26,7 @@ import sbt.Keys._
 object SparkPluginDef extends Build {
   lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle, 
sbtPomReader)
   lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = 
styleSettings)
-  lazy val sbtPomReader = 
uri("https://github.com/ScrapCodes/sbt-pom-reader.git";)
+  lazy val sbtPomReader = 
uri("https://github.com/ScrapCodes/sbt-pom-reader.git#ignore_artifact_id";)
 
   // There is actually no need to publish this artifact.
   def styleSettings = Defaults.defaultSettings ++ Seq (

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/pom.xml
----------------------------------------------------------------------
diff --git a/repl/pom.xml b/repl/pom.xml
index af528c8..bd688c8 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -39,6 +39,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>${jline.groupid}</groupId>
+      <artifactId>jline</artifactId>
+      <version>${jline.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
@@ -76,11 +81,6 @@
       <version>${scala.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>jline</artifactId>
-      <version>${scala.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>jul-to-slf4j</artifactId>
     </dependency>
@@ -124,4 +124,84 @@
       </plugin>
     </plugins>
   </build>
+  <profiles>
+    <profile>
+      <id>scala-2.10</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-scala-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/scala</source>
+                    <source>scala-2.10/src/main/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-scala-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/scala</source>
+                    <source>scala-2.10/src/test/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>scala-2.11</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-scala-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/scala</source>
+                    <source>scala-2.11/src/main/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-scala-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/test/scala</source>
+                    <source>scala-2.11/src/test/scala</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
new file mode 100644
index 0000000..14b448d
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import scala.collection.mutable.Set
+
+object Main {
+  private var _interp: SparkILoop = _
+
+  def interp = _interp
+
+  def interp_=(i: SparkILoop) { _interp = i }
+
+  def main(args: Array[String]) {
+    _interp = new SparkILoop
+    _interp.process(args)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
new file mode 100644
index 0000000..0581694
--- /dev/null
+++ 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc.{Settings, CompilerCommand}
+import scala.Predef._
+
+/**
+ * Command class enabling Spark-specific command line options (provided by
+ * <i>org.apache.spark.repl.SparkRunnerSettings</i>).
+ */
+class SparkCommandLine(args: List[String], override val settings: Settings)
+    extends CompilerCommand(args, settings) {
+
+  def this(args: List[String], error: String => Unit) {
+    this(args, new SparkRunnerSettings(error))
+  }
+
+  def this(args: List[String]) {
+    this(args, str => Console.println("Error: " + str))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
new file mode 100644
index 0000000..f8432c8
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -0,0 +1,114 @@
+// scalastyle:off
+
+/* NSC -- new Scala compiler
+ * Copyright 2005-2013 LAMP/EPFL
+ * @author  Paul Phillips
+ */
+
+package org.apache.spark.repl
+
+import scala.tools.nsc._
+import scala.tools.nsc.interpreter._
+
+import scala.reflect.internal.util.BatchSourceFile
+import scala.tools.nsc.ast.parser.Tokens.EOF
+
+import org.apache.spark.Logging
+
+trait SparkExprTyper extends Logging {
+  val repl: SparkIMain
+
+  import repl._
+  import global.{ reporter => _, Import => _, _ }
+  import definitions._
+  import syntaxAnalyzer.{ UnitParser, UnitScanner, token2name }
+  import naming.freshInternalVarName
+
+  object codeParser extends { val global: repl.global.type = repl.global } 
with CodeHandlers[Tree] {
+    def applyRule[T](code: String, rule: UnitParser => T): T = {
+      reporter.reset()
+      val scanner = newUnitParser(code)
+      val result  = rule(scanner)
+
+      if (!reporter.hasErrors)
+        scanner.accept(EOF)
+
+      result
+    }
+
+    def defns(code: String) = stmts(code) collect { case x: DefTree => x }
+    def expr(code: String)  = applyRule(code, _.expr())
+    def stmts(code: String) = applyRule(code, _.templateStats())
+    def stmt(code: String)  = stmts(code).last  // guaranteed nonempty
+  }
+
+  /** Parse a line into a sequence of trees. Returns None if the input is 
incomplete. */
+  def parse(line: String): Option[List[Tree]] = 
debugging(s"""parse("$line")""")  {
+    var isIncomplete = false
+    reporter.withIncompleteHandler((_, _) => isIncomplete = true) {
+      val trees = codeParser.stmts(line)
+      if (reporter.hasErrors) {
+        Some(Nil)
+      } else if (isIncomplete) {
+        None
+      } else {
+        Some(trees)
+      }
+    }
+  }
+  // def parsesAsExpr(line: String) = {
+  //   import codeParser._
+  //   (opt expr line).isDefined
+  // }
+
+  def symbolOfLine(code: String): Symbol = {
+    def asExpr(): Symbol = {
+      val name  = freshInternalVarName()
+      // Typing it with a lazy val would give us the right type, but runs
+      // into compiler bugs with things like existentials, so we compile it
+      // behind a def and strip the NullaryMethodType which wraps the expr.
+      val line = "def " + name + " = {\n" + code + "\n}"
+
+      interpretSynthetic(line) match {
+        case IR.Success =>
+          val sym0 = symbolOfTerm(name)
+          // drop NullaryMethodType
+          val sym = sym0.cloneSymbol setInfo 
afterTyper(sym0.info.finalResultType)
+          if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym
+        case _          => NoSymbol
+      }
+    }
+    def asDefn(): Symbol = {
+      val old = repl.definedSymbolList.toSet
+
+      interpretSynthetic(code) match {
+        case IR.Success =>
+          repl.definedSymbolList filterNot old match {
+            case Nil        => NoSymbol
+            case sym :: Nil => sym
+            case syms       => NoSymbol.newOverloaded(NoPrefix, syms)
+          }
+        case _ => NoSymbol
+      }
+    }
+    beQuietDuring(asExpr()) orElse beQuietDuring(asDefn())
+  }
+
+  private var typeOfExpressionDepth = 0
+  def typeOfExpression(expr: String, silent: Boolean = true): Type = {
+    if (typeOfExpressionDepth > 2) {
+      logDebug("Terminating typeOfExpression recursion for expression: " + 
expr)
+      return NoType
+    }
+    typeOfExpressionDepth += 1
+    // Don't presently have a good way to suppress undesirable success output
+    // while letting errors through, so it is first trying it silently: if 
there
+    // is an error, and errors are desired, then it re-evaluates non-silently
+    // to induce the error message.
+    try beSilentDuring(symbolOfLine(expr).tpe) match {
+      case NoType if !silent => symbolOfLine(expr).tpe // generate error
+      case tpe               => tpe
+    }
+    finally typeOfExpressionDepth -= 1
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4af9de7d/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala
----------------------------------------------------------------------
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala
new file mode 100644
index 0000000..5340951
--- /dev/null
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkHelper.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package scala.tools.nsc
+
+object SparkHelper {
+  def explicitParentLoader(settings: Settings) = settings.explicitParentLoader
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to