Repository: spark
Updated Branches:
  refs/heads/master 83488cc31 -> 0c03297bf


[SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2

## What changes were proposed in this pull request?

Move flume behind a profile, take 2. See 
https://github.com/apache/spark/pull/19365 for most of the back-story.

This change should fix the problem by removing the examples module dependency 
and moving Flume examples to the module itself. It also adds deprecation 
messages, per a discussion on dev about deprecating for 2.3.0.

## How was this patch tested?

Existing tests, which still enable flume integration.

Author: Sean Owen <so...@cloudera.com>

Closes #19412 from srowen/SPARK-22142.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c03297b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c03297b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c03297b

Branch: refs/heads/master
Commit: 0c03297bf0e87944f9fe0535fdae5518228e3e29
Parents: 83488cc
Author: Sean Owen <so...@cloudera.com>
Authored: Fri Oct 6 15:08:28 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Oct 6 15:08:28 2017 +0100

----------------------------------------------------------------------
 dev/create-release/release-build.sh             |  4 +-
 dev/mima                                        |  2 +-
 dev/scalastyle                                  |  1 +
 dev/sparktestsupport/modules.py                 | 20 +++++-
 dev/test-dependencies.sh                        |  2 +-
 docs/building-spark.md                          |  7 ++
 docs/streaming-flume-integration.md             | 13 ++--
 examples/pom.xml                                |  7 --
 .../examples/streaming/JavaFlumeEventCount.java | 69 -------------------
 .../examples/streaming/FlumeEventCount.scala    | 70 --------------------
 .../streaming/FlumePollingEventCount.scala      | 67 -------------------
 .../spark/examples/JavaFlumeEventCount.java     | 67 +++++++++++++++++++
 .../apache/spark/examples/FlumeEventCount.scala | 68 +++++++++++++++++++
 .../spark/examples/FlumePollingEventCount.scala | 65 ++++++++++++++++++
 .../spark/streaming/flume/FlumeUtils.scala      |  1 +
 pom.xml                                         | 13 +++-
 project/SparkBuild.scala                        | 17 ++---
 python/pyspark/streaming/flume.py               |  4 ++
 python/pyspark/streaming/tests.py               | 16 ++++-
 19 files changed, 273 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/create-release/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index 5390f59..7e8d5c7 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -84,9 +84,9 @@ MVN="build/mvn --force"
 # Hive-specific profiles for some builds
 HIVE_PROFILES="-Phive -Phive-thriftserver"
 # Profiles for publishing snapshots and release to Maven Central
-PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl 
-Pkinesis-asl"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl 
-Pkinesis-asl"
 # Profiles for building binary releases
-BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
+BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr"
 # Scala 2.11 only profiles for some builds
 SCALA_2_11_PROFILES="-Pkafka-0-8"
 # Scala 2.12 only profiles for some builds

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/mima
----------------------------------------------------------------------
diff --git a/dev/mima b/dev/mima
index fdb21f5..1e3ca97 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,7 +24,7 @@ set -e
 FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 cd "$FWDIR"
 
-SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl 
-Phive-thriftserver -Phive"
+SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl 
-Pkinesis-asl -Phive-thriftserver -Phive"
 TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
tools/fullClasspath" | tail -n1)"
 OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES 
"export oldDeps/fullClasspath" | tail -n1)"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/scalastyle
----------------------------------------------------------------------
diff --git a/dev/scalastyle b/dev/scalastyle
index e5aa589..89ecc8a 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \
         -Pmesos \
         -Pkafka-0-8 \
         -Pyarn \
+        -Pflume \
         -Phive \
         -Phive-thriftserver \
         scalastyle test:scalastyle \

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 50e14b6..91d5667 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -279,6 +279,12 @@ streaming_flume_sink = Module(
     source_file_regexes=[
         "external/flume-sink",
     ],
+    build_profile_flags=[
+        "-Pflume",
+    ],
+    environ={
+        "ENABLE_FLUME_TESTS": "1"
+    },
     sbt_test_goals=[
         "streaming-flume-sink/test",
     ]
@@ -291,6 +297,12 @@ streaming_flume = Module(
     source_file_regexes=[
         "external/flume",
     ],
+    build_profile_flags=[
+        "-Pflume",
+    ],
+    environ={
+        "ENABLE_FLUME_TESTS": "1"
+    },
     sbt_test_goals=[
         "streaming-flume/test",
     ]
@@ -302,7 +314,13 @@ streaming_flume_assembly = Module(
     dependencies=[streaming_flume, streaming_flume_sink],
     source_file_regexes=[
         "external/flume-assembly",
-    ]
+    ],
+    build_profile_flags=[
+        "-Pflume",
+    ],
+    environ={
+        "ENABLE_FLUME_TESTS": "1"
+    }
 )
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/test-dependencies.sh
----------------------------------------------------------------------
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index c771457..58b295d 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -29,7 +29,7 @@ export LC_ALL=C
 # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style 
resolution.
 
 # NOTE: These should match those in the release publishing script
-HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive"
+HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn 
-Pflume -Phive"
 MVN="build/mvn"
 HADOOP_PROFILES=(
     hadoop-2.6

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/docs/building-spark.md
----------------------------------------------------------------------
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 57baa50..98f7df1 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -100,6 +100,13 @@ Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
 
 Kafka 0.10 support is still automatically built.
 
+## Building with Flume support
+
+Apache Flume support must be explicitly enabled with the `flume` profile.
+Note: Flume support is deprecated as of Spark 2.3.0.
+
+    ./build/mvn -Pflume -DskipTests clean package
+
 ## Building submodules individually
 
 It's possible to build Spark sub-modules using the `mvn -pl` option.

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/docs/streaming-flume-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-flume-integration.md 
b/docs/streaming-flume-integration.md
index a5d36da..257a4f7 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide
 
 [Apache Flume](https://flume.apache.org/) is a distributed, reliable, and 
available service for efficiently collecting, aggregating, and moving large 
amounts of log data. Here we explain how to configure Flume and Spark Streaming 
to receive data from Flume. There are two approaches to this.
 
+**Note: Flume support is deprecated as of Spark 2.3.0.**
+
 ## Approach 1: Flume-style Push-based Approach
 Flume is designed to push data between Flume agents. In this approach, Spark 
Streaming essentially sets up a receiver that acts an Avro agent for Flume, to 
which Flume can push the data. Here are the configuration steps.
 
@@ -44,8 +46,7 @@ configuring Flume agents.
 
                val flumeStream = FlumeUtils.createStream(streamingContext, 
[chosen machine's hostname], [chosen port])
 
-       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala).
+       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$).
        </div>
        <div data-lang="java" markdown="1">
                import org.apache.spark.streaming.flume.*;
@@ -53,8 +54,7 @@ configuring Flume agents.
                JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
                FlumeUtils.createStream(streamingContext, [chosen machine's 
hostname], [chosen port]);
 
-       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
+       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html).
        </div>
        <div data-lang="python" markdown="1">
                from pyspark.streaming.flume import FlumeUtils
@@ -62,8 +62,7 @@ configuring Flume agents.
                flumeStream = FlumeUtils.createStream(streamingContext, [chosen 
machine's hostname], [chosen port])
 
        By default, the Python API will decode Flume event body as UTF8 encoded 
strings. You can specify your custom decoding function to decode the body byte 
arrays in Flume events to any arbitrary data type. 
-       See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/flume_wordcount.py).
+       See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
        </div>
        </div>
 
@@ -162,8 +161,6 @@ configuring Flume agents.
        </div>
        </div>
 
-       See the Scala example 
[FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
-
        Note that each input DStream can be configured to receive data from 
multiple sinks.
 
 3. **Deploying:** This is same as the first approach.

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 52a6764..1791dba 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,7 +34,6 @@
     <sbt.project.name>examples</sbt.project.name>
     <build.testJarPhase>none</build.testJarPhase>
     <build.copyDependenciesPhase>package</build.copyDependenciesPhase>
-    <flume.deps.scope>provided</flume.deps.scope>
     <hadoop.deps.scope>provided</hadoop.deps.scope>
     <hive.deps.scope>provided</hive.deps.scope>
     <parquet.deps.scope>provided</parquet.deps.scope>
@@ -80,12 +79,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
       
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
deleted file mode 100644
index 0c65104..0000000
--- 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with an AvroSink in Flume. It will start
- *  an Avro server on at the request host:port address and listen for requests.
- *  Your Flume AvroSink should be pointed to this address.
- *
- *  Usage: JavaFlumeEventCount <host> <port>
- *    <host> is the host the Flume receiver will be started on - a receiver
- *           creates a server and listens for flume events.
- *    <port> is the port the Flume receiver will listen on.
- *
- *  To run this example:
- *     `$ bin/run-example 
org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
- */
-public final class JavaFlumeEventCount {
-  private JavaFlumeEventCount() {
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: JavaFlumeEventCount <host> <port>");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-
-    Duration batchInterval = new Duration(2000);
-    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
-    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
-    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
-      FlumeUtils.createStream(ssc, host, port);
-
-    flumeStream.count();
-
-    flumeStream.count().map(in -> "Received " + in + " flume events.").print();
-
-    ssc.start();
-    ssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
deleted file mode 100644
index 91e52e4..0000000
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with an AvroSink in Flume. It will start
- *  an Avro server on at the request host:port address and listen for requests.
- *  Your Flume AvroSink should be pointed to this address.
- *
- *  Usage: FlumeEventCount <host> <port>
- *    <host> is the host the Flume receiver will be started on - a receiver
- *           creates a server and listens for flume events.
- *    <port> is the port the Flume receiver will listen on.
- *
- *  To run this example:
- *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 
<host> <port> `
- */
-object FlumeEventCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: FlumeEventCount <host> <port>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(host, IntParam(port)) = args
-
-    val batchInterval = Milliseconds(2000)
-
-    // Create the context and set the batch size
-    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
-    val ssc = new StreamingContext(sparkConf, batchInterval)
-
-    // Create a flume stream
-    val stream = FlumeUtils.createStream(ssc, host, port, 
StorageLevel.MEMORY_ONLY_SER_2)
-
-    // Print out the count of events received from this server in each batch
-    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
deleted file mode 100644
index dd725d7..0000000
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with the Spark Sink running in a Flume 
agent. See
- *  the Spark Streaming programming guide for more details.
- *
- *  Usage: FlumePollingEventCount <host> <port>
- *    `host` is the host on which the Spark Sink is running.
- *    `port` is the port at which the Spark Sink is listening.
- *
- *  To run this example:
- *    `$ bin/run-example 
org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
- */
-object FlumePollingEventCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: FlumePollingEventCount <host> <port>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(host, IntParam(port)) = args
-
-    val batchInterval = Milliseconds(2000)
-
-    // Create the context and set the batch size
-    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
-    val ssc = new StreamingContext(sparkConf, batchInterval)
-
-    // Create a flume stream that polls the Spark Sink running in a Flume agent
-    val stream = FlumeUtils.createPollingStream(ssc, host, port)
-
-    // Print out the count of events received from this server in each batch
-    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
 
b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
new file mode 100644
index 0000000..4e3420d
--- /dev/null
+++ 
b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
+
+/**
+ *  Produces a count of events received from Flume.
+ *
+ *  This should be used in conjunction with an AvroSink in Flume. It will start
+ *  an Avro server on at the request host:port address and listen for requests.
+ *  Your Flume AvroSink should be pointed to this address.
+ *
+ *  Usage: JavaFlumeEventCount <host> <port>
+ *    <host> is the host the Flume receiver will be started on - a receiver
+ *           creates a server and listens for flume events.
+ *    <port> is the port the Flume receiver will listen on.
+ *
+ *  To run this example:
+ *     `$ bin/run-example 
org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
+ */
+public final class JavaFlumeEventCount {
+  private JavaFlumeEventCount() {
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 2) {
+      System.err.println("Usage: JavaFlumeEventCount <host> <port>");
+      System.exit(1);
+    }
+
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+
+    Duration batchInterval = new Duration(2000);
+    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
+    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
+    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
+      FlumeUtils.createStream(ssc, host, port);
+
+    flumeStream.count();
+
+    flumeStream.count().map(in -> "Received " + in + " flume events.").print();
+
+    ssc.start();
+    ssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala 
b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
new file mode 100644
index 0000000..f877f79
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ *  Produces a count of events received from Flume.
+ *
+ *  This should be used in conjunction with an AvroSink in Flume. It will start
+ *  an Avro server on at the request host:port address and listen for requests.
+ *  Your Flume AvroSink should be pointed to this address.
+ *
+ *  Usage: FlumeEventCount <host> <port>
+ *    <host> is the host the Flume receiver will be started on - a receiver
+ *           creates a server and listens for flume events.
+ *    <port> is the port the Flume receiver will listen on.
+ *
+ *  To run this example:
+ *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 
<host> <port> `
+ */
+object FlumeEventCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println(
+        "Usage: FlumeEventCount <host> <port>")
+      System.exit(1)
+    }
+
+    val Array(host, IntParam(port)) = args
+
+    val batchInterval = Milliseconds(2000)
+
+    // Create the context and set the batch size
+    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
+    val ssc = new StreamingContext(sparkConf, batchInterval)
+
+    // Create a flume stream
+    val stream = FlumeUtils.createStream(ssc, host, port, 
StorageLevel.MEMORY_ONLY_SER_2)
+
+    // Print out the count of events received from this server in each batch
+    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
 
b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
new file mode 100644
index 0000000..79a4027
--- /dev/null
+++ 
b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
+
+/**
+ *  Produces a count of events received from Flume.
+ *
+ *  This should be used in conjunction with the Spark Sink running in a Flume 
agent. See
+ *  the Spark Streaming programming guide for more details.
+ *
+ *  Usage: FlumePollingEventCount <host> <port>
+ *    `host` is the host on which the Spark Sink is running.
+ *    `port` is the port at which the Spark Sink is listening.
+ *
+ *  To run this example:
+ *    `$ bin/run-example 
org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
+ */
+object FlumePollingEventCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println(
+        "Usage: FlumePollingEventCount <host> <port>")
+      System.exit(1)
+    }
+
+    val Array(host, IntParam(port)) = args
+
+    val batchInterval = Milliseconds(2000)
+
+    // Create the context and set the batch size
+    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
+    val ssc = new StreamingContext(sparkConf, batchInterval)
+
+    // Create a flume stream that polls the Spark Sink running in a Flume agent
+    val stream = FlumeUtils.createPollingStream(ssc, host, port)
+
+    // Print out the count of events received from this server in each batch
+    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 3e3ed71..707193a 100644
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.api.java.{JavaPairDStream, 
JavaReceiverInputDStream, JavaStreamingContext}
 import org.apache.spark.streaming.dstream.ReceiverInputDStream
 
+@deprecated("Deprecated without replacement", "2.3.0")
 object FlumeUtils {
   private val DEFAULT_POLLING_PARALLELISM = 5
   private val DEFAULT_POLLING_BATCH_SIZE = 1000

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 87a468c..9fac8b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,15 +98,13 @@
     <module>sql/core</module>
     <module>sql/hive</module>
     <module>assembly</module>
-    <module>external/flume</module>
-    <module>external/flume-sink</module>
-    <module>external/flume-assembly</module>
     <module>examples</module>
     <module>repl</module>
     <module>launcher</module>
     <module>external/kafka-0-10</module>
     <module>external/kafka-0-10-assembly</module>
     <module>external/kafka-0-10-sql</module>
+    <!-- See additional modules enabled by profiles below -->
   </modules>
 
   <properties>
@@ -2583,6 +2581,15 @@
       </dependencies>
     </profile>
 
+    <profile>
+      <id>flume</id>
+      <modules>
+        <module>external/flume</module>
+        <module>external/flume-sink</module>
+        <module>external/flume-assembly</module>
+      </modules>
+    </profile>
+
     <!-- Ganglia integration is not included by default due to LGPL-licensed 
code -->
     <profile>
       <id>spark-ganglia-lgpl</id>

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a568d26..9501eed 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -43,11 +43,8 @@ object BuildCommons {
     "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
   ).map(ProjectRef(buildLocation, _))
 
-  val streamingProjects@Seq(
-    streaming, streamingFlumeSink, streamingFlume, streamingKafka010
-  ) = Seq(
-    "streaming", "streaming-flume-sink", "streaming-flume", 
"streaming-kafka-0-10"
-  ).map(ProjectRef(buildLocation, _))
+  val streamingProjects@Seq(streaming, streamingKafka010) =
+    Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _))
 
   val allProjects@Seq(
     core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, 
launcher, unsafe, tags, sketch, kvstore, _*
@@ -56,9 +53,13 @@ object BuildCommons {
     "tags", "sketch", "kvstore"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
 
-  val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, 
sparkGangliaLgpl,
-    streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) =
-    Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", 
"streaming-kinesis-asl",
+  val optionallyEnabledProjects@Seq(mesos, yarn,
+    streamingFlumeSink, streamingFlume,
+    streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
+    dockerIntegrationTests, hadoopCloud) =
+    Seq("mesos", "yarn",
+      "streaming-flume-sink", "streaming-flume",
+      "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
       "docker-integration-tests", 
"hadoop-cloud").map(ProjectRef(buildLocation, _))
 
   val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, 
streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) 
=

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/python/pyspark/streaming/flume.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/flume.py 
b/python/pyspark/streaming/flume.py
index cd30483..2fed594 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -53,6 +53,8 @@ class FlumeUtils(object):
         :param enableDecompression:  Should netty server decompress input 
stream
         :param bodyDecoder:  A function used to decode body (default is 
utf8_decoder)
         :return: A DStream object
+
+        .. note:: Deprecated in 2.3.0
         """
         jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
         helper = FlumeUtils._get_helper(ssc._sc)
@@ -79,6 +81,8 @@ class FlumeUtils(object):
                              will result in this stream using more threads
         :param bodyDecoder:  A function used to decode body (default is 
utf8_decoder)
         :return: A DStream object
+
+        .. note:: Deprecated in 2.3.0
         """
         jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
         hosts = []

http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 229cf53..5b86c1c 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1478,7 +1478,7 @@ def search_kafka_assembly_jar():
             ("Failed to find Spark Streaming kafka assembly jar in %s. " % 
kafka_assembly_dir) +
             "You need to build Spark with "
             "'build/sbt assembly/package 
streaming-kafka-0-8-assembly/assembly' or "
-            "'build/mvn package' before running this test.")
+            "'build/mvn -Pkafka-0-8 package' before running this test.")
     elif len(jars) > 1:
         raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: 
%s; please "
                          "remove all but one") % (", ".join(jars)))
@@ -1495,7 +1495,7 @@ def search_flume_assembly_jar():
             ("Failed to find Spark Streaming Flume assembly jar in %s. " % 
flume_assembly_dir) +
             "You need to build Spark with "
             "'build/sbt assembly/assembly streaming-flume-assembly/assembly' 
or "
-            "'build/mvn package' before running this test.")
+            "'build/mvn -Pflume package' before running this test.")
     elif len(jars) > 1:
         raise Exception(("Found multiple Spark Streaming Flume assembly JARs: 
%s; please "
                         "remove all but one") % (", ".join(jars)))
@@ -1517,6 +1517,9 @@ def search_kinesis_asl_assembly_jar():
 
 
 # Must be same as the variable and condition defined in modules.py
+flume_test_environ_var = "ENABLE_FLUME_TESTS"
+are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1'
+# Must be same as the variable and condition defined in modules.py
 kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
 are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
 # Must be same as the variable and condition defined in KinesisTestUtils.scala 
and modules.py
@@ -1538,9 +1541,16 @@ if __name__ == "__main__":
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     testcases = [BasicOperationTests, WindowFunctionTests, 
StreamingContextTests, CheckpointTests,
-                 FlumeStreamTests, FlumePollingStreamTests,
                  StreamingListenerTests]
 
+    if are_flume_tests_enabled:
+        testcases.append(FlumeStreamTests)
+        testcases.append(FlumePollingStreamTests)
+    else:
+        sys.stderr.write(
+            "Skipped test_flume_stream (enable by setting environment variable 
%s=1"
+            % flume_test_environ_var)
+
     if are_kafka_tests_enabled:
         testcases.append(KafkaStreamTests)
     else:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to