Repository: spark Updated Branches: refs/heads/master 69f5e9cce -> a00181418
http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala deleted file mode 100644 index 9241b13..0000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.net.InetSocketAddress -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream} -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.util.{ManualClock, Utils} - -class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { - - val maxAttempts = 5 - val batchDuration = Seconds(1) - - @transient private var _sc: SparkContext = _ - - val conf = new SparkConf() - .setMaster("local[2]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock") - - val utils = new PollingFlumeTestUtils - - override def beforeAll(): Unit = { - super.beforeAll() - _sc = new SparkContext(conf) - } - - override def afterAll(): Unit = { - try { - if (_sc != null) { - _sc.stop() - _sc = null - } - } finally { - super.afterAll() - } - } - - test("flume polling test") { - testMultipleTimes(() => testFlumePolling()) - } - - test("flume polling test multiple hosts") { - testMultipleTimes(() => testFlumePollingMultipleHost()) - } - - /** - * Run the given test until no more java.net.BindException's are thrown. - * Do this only up to a certain attempt limit. - */ - private def testMultipleTimes(test: () => Unit): Unit = { - var testPassed = false - var attempt = 0 - while (!testPassed && attempt < maxAttempts) { - try { - test() - testPassed = true - } catch { - case e: Exception if Utils.isBindCollision(e) => - logWarning("Exception when running flume polling test: " + e) - attempt += 1 - } - } - assert(testPassed, s"Test failed after $attempt attempts!") - } - - private def testFlumePolling(): Unit = { - try { - val port = utils.startSingleSink() - - writeAndVerify(Seq(port)) - utils.assertChannelsAreEmpty() - } finally { - utils.close() - } - } - - private def testFlumePollingMultipleHost(): Unit = { - try { - val ports = utils.startMultipleSinks() - writeAndVerify(ports) - utils.assertChannelsAreEmpty() - } finally { - utils.close() - } - } - - def writeAndVerify(sinkPorts: Seq[Int]): Unit = { - // Set up the streaming context and input streams - val ssc = new StreamingContext(_sc, batchDuration) - val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port)) - val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, - utils.eventsPerBatch, 5) - val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputQueue) - outputStream.register() - - ssc.start() - try { - utils.sendDataAndEnsureAllDataHasBeenReceived() - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - clock.advance(batchDuration.milliseconds) - - // The eventually is required to ensure that all data in the batch has been processed. - eventually(timeout(10 seconds), interval(100 milliseconds)) { - val flattenOutput = outputQueue.asScala.toSeq.flatten - val headers = flattenOutput.map(_.event.getHeaders.asScala.map { - case (key, value) => (key.toString, value.toString) - }).map(_.asJava) - val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody)) - utils.assertOutput(headers.asJava, bodies.asJava) - } - } finally { - // here stop ssc only, but not underlying sparkcontext - ssc.stop(false) - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala deleted file mode 100644 index 7bac1cc..0000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.jboss.netty.channel.ChannelPipeline -import org.jboss.netty.channel.socket.SocketChannel -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.handler.codec.compression._ -import org.scalatest.{BeforeAndAfter, Matchers} -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream} - -class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { - val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite") - var ssc: StreamingContext = null - - test("flume input stream") { - testFlumeStream(testCompression = false) - } - - test("flume input compressed stream") { - testFlumeStream(testCompression = true) - } - - /** Run test on flume stream */ - private def testFlumeStream(testCompression: Boolean): Unit = { - val input = (1 to 100).map { _.toString } - val utils = new FlumeTestUtils - try { - val outputQueue = startContext(utils.getTestPort(), testCompression) - - eventually(timeout(10 seconds), interval(100 milliseconds)) { - utils.writeInput(input.asJava, testCompression) - } - - eventually(timeout(10 seconds), interval(100 milliseconds)) { - val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event } - outputEvents.foreach { - event => - event.getHeaders.get("test") should be("header") - } - val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody)) - output should be (input) - } - } finally { - if (ssc != null) { - ssc.stop() - } - utils.close() - } - } - - /** Setup and start the streaming context */ - private def startContext( - testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = { - ssc = new StreamingContext(conf, Milliseconds(200)) - val flumeStream = FlumeUtils.createStream( - ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression) - val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputQueue) - outputStream.register() - ssc.start() - outputQueue - } - - /** Class to create socket channel with compression */ - private class CompressionChannelFactory(compressionLevel: Int) - extends NioClientSocketChannelFactory { - - override def newChannel(pipeline: ChannelPipeline): SocketChannel = { - val encoder = new ZlibEncoder(compressionLevel) - pipeline.addFirst("deflater", encoder) - pipeline.addFirst("inflater", new ZlibDecoder()) - super.newChannel(pipeline) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 0acc9b8..ba4009e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -108,7 +108,6 @@ private[spark] class DirectKafkaInputDStream[K, V]( } } - // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]" protected[streaming] override val checkpointData = http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 9297c39..2ec771e 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -67,7 +67,6 @@ class DirectKafkaInputDStream[ val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) - // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") private[streaming] override def name: String = s"Kafka direct stream [$id]" protected[streaming] override val checkpointData = http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 59ba317..7ce7c9f 100644 --- a/pom.xml +++ b/pom.xml @@ -121,7 +121,6 @@ <hadoop.version>2.7.3</hadoop.version> <protobuf.version>2.5.0</protobuf.version> <yarn.version>${hadoop.version}</yarn.version> - <flume.version>1.6.0</flume.version> <zookeeper.version>3.4.6</zookeeper.version> <curator.version>2.7.1</curator.version> <hive.group>org.spark-project.hive</hive.group> @@ -212,7 +211,6 @@ during compilation if the dependency is transivite (e.g. "graphx/" depending on "core/" and needing Hadoop classes in the classpath to compile). --> - <flume.deps.scope>compile</flume.deps.scope> <hadoop.deps.scope>compile</hadoop.deps.scope> <hive.deps.scope>compile</hive.deps.scope> <orc.deps.scope>compile</orc.deps.scope> @@ -1806,46 +1804,6 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-core</artifactId> - <version>${flume.version}</version> - <scope>${flume.deps.scope}</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-auth</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.flume</groupId> - <artifactId>flume-ng-sdk</artifactId> - <version>${flume.version}</version> - <scope>${flume.deps.scope}</scope> - <exclusions> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <version>${calcite.version}</version> @@ -2635,15 +2593,6 @@ </dependencies> </profile> - <profile> - <id>flume</id> - <modules> - <module>external/flume</module> - <module>external/flume-sink</module> - <module>external/flume-assembly</module> - </modules> - </profile> - <!-- Ganglia integration is not included by default due to LGPL-licensed code --> <profile> <id>spark-ganglia-lgpl</id> @@ -2836,9 +2785,6 @@ that does not have them. --> <profile> - <id>flume-provided</id> - </profile> - <profile> <id>hadoop-provided</id> </profile> <profile> http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a5ed908..8b01b90 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -55,16 +55,14 @@ object BuildCommons { ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn, - streamingFlumeSink, streamingFlume, streamingKafka, sparkGangliaLgpl, streamingKinesisAsl, dockerIntegrationTests, hadoopCloud, kubernetesIntegrationTests) = Seq("kubernetes", "mesos", "yarn", - "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests", "hadoop-cloud", "kubernetes-integration-tests").map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) = - Seq("network-yarn", "streaming-flume-assembly", "streaming-kafka-0-8-assembly", "streaming-kafka-0-10-assembly", "streaming-kinesis-asl-assembly") + val assemblyProjects@Seq(networkYarn, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) = + Seq("network-yarn", "streaming-kafka-0-8-assembly", "streaming-kafka-0-10-assembly", "streaming-kinesis-asl-assembly") .map(ProjectRef(buildLocation, _)) val copyJarsProjects@Seq(assembly, examples) = Seq("assembly", "examples") @@ -373,8 +371,6 @@ object SparkBuild extends PomBuild { /* Hive console settings */ enable(Hive.settings)(hive) - enable(Flume.settings)(streamingFlumeSink) - // SPARK-14738 - Remove docker tests from main Spark build // enable(DockerIntegrationTests.settings)(dockerIntegrationTests) @@ -452,9 +448,6 @@ object Unsafe { ) } -object Flume { - lazy val settings = sbtavro.SbtAvro.avroSettings -} object DockerIntegrationTests { // This serves to override the override specified in DependencyOverrides: @@ -587,8 +580,7 @@ object Assembly { .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String]) }, jarName in assembly := { - if (moduleName.value.contains("streaming-flume-assembly") - || moduleName.value.contains("streaming-kafka-0-8-assembly") + if (moduleName.value.contains("streaming-kafka-0-8-assembly") || moduleName.value.contains("streaming-kafka-0-10-assembly") || moduleName.value.contains("streaming-kinesis-asl-assembly")) { // This must match the same name used in maven (see external/kafka-0-8-assembly/pom.xml) @@ -694,10 +686,10 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, yarn, tags, streamingKafka010, sqlKafka010, avro), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, yarn, tags, streamingKafka010, sqlKafka010, avro), unidocAllClasspaths in (ScalaUnidoc, unidoc) := { http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/docs/pyspark.streaming.rst ---------------------------------------------------------------------- diff --git a/python/docs/pyspark.streaming.rst b/python/docs/pyspark.streaming.rst index 25ceaba..9c25628 100644 --- a/python/docs/pyspark.streaming.rst +++ b/python/docs/pyspark.streaming.rst @@ -22,10 +22,3 @@ pyspark.streaming.kinesis module :members: :undoc-members: :show-inheritance: - -pyspark.streaming.flume.module ------------------------------- -.. automodule:: pyspark.streaming.flume - :members: - :undoc-members: - :show-inheritance: http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/pyspark/streaming/dstream.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ce42a85..946601e 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -45,7 +45,7 @@ class DStream(object): for more details on RDDs). DStreams can either be created from live data (such as, data from TCP - sockets, Kafka, Flume, etc.) using a L{StreamingContext} or it can be + sockets, Kafka, etc.) using a L{StreamingContext} or it can be generated by transforming existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream periodically generates a RDD, either http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/pyspark/streaming/flume.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py deleted file mode 100644 index 5de4481..0000000 --- a/python/pyspark/streaming/flume.py +++ /dev/null @@ -1,156 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys -if sys.version >= "3": - from io import BytesIO -else: - from StringIO import StringIO -import warnings - -from py4j.protocol import Py4JJavaError - -from pyspark.storagelevel import StorageLevel -from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int -from pyspark.streaming import DStream - -__all__ = ['FlumeUtils', 'utf8_decoder'] - - -def utf8_decoder(s): - """ Decode the unicode as UTF-8 """ - if s is None: - return None - return s.decode('utf-8') - - -class FlumeUtils(object): - - @staticmethod - def createStream(ssc, hostname, port, - storageLevel=StorageLevel.MEMORY_AND_DISK_2, - enableDecompression=False, - bodyDecoder=utf8_decoder): - """ - Create an input stream that pulls events from Flume. - - :param ssc: StreamingContext object - :param hostname: Hostname of the slave machine to which the flume data will be sent - :param port: Port of the slave machine to which the flume data will be sent - :param storageLevel: Storage level to use for storing the received objects - :param enableDecompression: Should netty server decompress input stream - :param bodyDecoder: A function used to decode body (default is utf8_decoder) - :return: A DStream object - - .. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. - See SPARK-22142. - """ - warnings.warn( - "Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. " - "See SPARK-22142.", - DeprecationWarning) - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - helper = FlumeUtils._get_helper(ssc._sc) - jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression) - return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder) - - @staticmethod - def createPollingStream(ssc, addresses, - storageLevel=StorageLevel.MEMORY_AND_DISK_2, - maxBatchSize=1000, - parallelism=5, - bodyDecoder=utf8_decoder): - """ - Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. - This stream will poll the sink for data and will pull events as they are available. - - :param ssc: StreamingContext object - :param addresses: List of (host, port)s on which the Spark Sink is running. - :param storageLevel: Storage level to use for storing the received objects - :param maxBatchSize: The maximum number of events to be pulled from the Spark sink - in a single RPC call - :param parallelism: Number of concurrent requests this stream should send to the sink. - Note that having a higher number of requests concurrently being pulled - will result in this stream using more threads - :param bodyDecoder: A function used to decode body (default is utf8_decoder) - :return: A DStream object - - .. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. - See SPARK-22142. - """ - warnings.warn( - "Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. " - "See SPARK-22142.", - DeprecationWarning) - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - hosts = [] - ports = [] - for (host, port) in addresses: - hosts.append(host) - ports.append(port) - helper = FlumeUtils._get_helper(ssc._sc) - jstream = helper.createPollingStream( - ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism) - return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder) - - @staticmethod - def _toPythonDStream(ssc, jstream, bodyDecoder): - ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) - stream = DStream(jstream, ssc, ser) - - def func(event): - headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0]) - headers = {} - strSer = UTF8Deserializer() - for i in range(0, read_int(headersBytes)): - key = strSer.loads(headersBytes) - value = strSer.loads(headersBytes) - headers[key] = value - body = bodyDecoder(event[1]) - return (headers, body) - return stream.map(func) - - @staticmethod - def _get_helper(sc): - try: - return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper() - except TypeError as e: - if str(e) == "'JavaPackage' object is not callable": - FlumeUtils._printErrorMsg(sc) - raise - - @staticmethod - def _printErrorMsg(sc): - print(""" -________________________________________________________________________________________________ - - Spark Streaming's Flume libraries not found in class path. Try one of the following. - - 1. Include the Flume library and its dependencies with in the - spark-submit command as - - $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ... - - 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, - Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s. - Then, include the jar in the spark-submit command as - - $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ... - -________________________________________________________________________________________________ - -""" % (sc.version, sc.version)) http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 5cef621..4b995c0 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -48,7 +48,6 @@ from pyspark.context import SparkConf, SparkContext, RDD from pyspark.storagelevel import StorageLevel from pyspark.streaming.context import StreamingContext from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition -from pyspark.streaming.flume import FlumeUtils from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream from pyspark.streaming.listener import StreamingListener @@ -1301,148 +1300,6 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream) -class FlumeStreamTests(PySparkStreamingTestCase): - timeout = 20 # seconds - duration = 1 - - def setUp(self): - super(FlumeStreamTests, self).setUp() - self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils() - - def tearDown(self): - if self._utils is not None: - self._utils.close() - self._utils = None - - super(FlumeStreamTests, self).tearDown() - - def _startContext(self, n, compressed): - # Start the StreamingContext and also collect the result - dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(), - enableDecompression=compressed) - result = [] - - def get_output(_, rdd): - for event in rdd.collect(): - if len(result) < n: - result.append(event) - dstream.foreachRDD(get_output) - self.ssc.start() - return result - - def _validateResult(self, input, result): - # Validate both the header and the body - header = {"test": "header"} - self.assertEqual(len(input), len(result)) - for i in range(0, len(input)): - self.assertEqual(header, result[i][0]) - self.assertEqual(input[i], result[i][1]) - - def _writeInput(self, input, compressed): - # Try to write input to the receiver until success or timeout - start_time = time.time() - while True: - try: - self._utils.writeInput(input, compressed) - break - except: - if time.time() - start_time < self.timeout: - time.sleep(0.01) - else: - raise - - def test_flume_stream(self): - input = [str(i) for i in range(1, 101)] - result = self._startContext(len(input), False) - self._writeInput(input, False) - self.wait_for(result, len(input)) - self._validateResult(input, result) - - def test_compressed_flume_stream(self): - input = [str(i) for i in range(1, 101)] - result = self._startContext(len(input), True) - self._writeInput(input, True) - self.wait_for(result, len(input)) - self._validateResult(input, result) - - -class FlumePollingStreamTests(PySparkStreamingTestCase): - timeout = 20 # seconds - duration = 1 - maxAttempts = 5 - - def setUp(self): - self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils() - - def tearDown(self): - if self._utils is not None: - self._utils.close() - self._utils = None - - def _writeAndVerify(self, ports): - # Set up the streaming context and input streams - ssc = StreamingContext(self.sc, self.duration) - try: - addresses = [("localhost", port) for port in ports] - dstream = FlumeUtils.createPollingStream( - ssc, - addresses, - maxBatchSize=self._utils.eventsPerBatch(), - parallelism=5) - outputBuffer = [] - - def get_output(_, rdd): - for e in rdd.collect(): - outputBuffer.append(e) - - dstream.foreachRDD(get_output) - ssc.start() - self._utils.sendDataAndEnsureAllDataHasBeenReceived() - - self.wait_for(outputBuffer, self._utils.getTotalEvents()) - outputHeaders = [event[0] for event in outputBuffer] - outputBodies = [event[1] for event in outputBuffer] - self._utils.assertOutput(outputHeaders, outputBodies) - finally: - ssc.stop(False) - - def _testMultipleTimes(self, f): - attempt = 0 - while True: - try: - f() - break - except: - attempt += 1 - if attempt >= self.maxAttempts: - raise - else: - import traceback - traceback.print_exc() - - def _testFlumePolling(self): - try: - port = self._utils.startSingleSink() - self._writeAndVerify([port]) - self._utils.assertChannelsAreEmpty() - finally: - self._utils.close() - - def _testFlumePollingMultipleHosts(self): - try: - port = self._utils.startSingleSink() - self._writeAndVerify([port]) - self._utils.assertChannelsAreEmpty() - finally: - self._utils.close() - - def test_flume_polling(self): - self._testMultipleTimes(self._testFlumePolling) - - def test_flume_polling_multiple_hosts(self): - self._testMultipleTimes(self._testFlumePollingMultipleHosts) - - class KinesisStreamTests(PySparkStreamingTestCase): def test_kinesis_stream_api(self): @@ -1531,23 +1388,6 @@ def search_kafka_assembly_jar(): return jars[0] -def search_flume_assembly_jar(): - SPARK_HOME = os.environ["SPARK_HOME"] - flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly") - jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly") - if not jars: - raise Exception( - ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) + - "You need to build Spark with " - "'build/sbt -Pflume assembly/package streaming-flume-assembly/assembly' or " - "'build/mvn -DskipTests -Pflume package' before running this test.") - elif len(jars) > 1: - raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please " - "remove all but one") % (", ".join(jars))) - else: - return jars[0] - - def _kinesis_asl_assembly_dir(): SPARK_HOME = os.environ["SPARK_HOME"] return os.path.join(SPARK_HOME, "external/kinesis-asl-assembly") @@ -1565,9 +1405,6 @@ def search_kinesis_asl_assembly_jar(): # Must be same as the variable and condition defined in modules.py -flume_test_environ_var = "ENABLE_FLUME_TESTS" -are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1' -# Must be same as the variable and condition defined in modules.py kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS" are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1' # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py @@ -1577,15 +1414,14 @@ are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1' if __name__ == "__main__": from pyspark.streaming.tests import * kafka_assembly_jar = search_kafka_assembly_jar() - flume_assembly_jar = search_flume_assembly_jar() kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() if kinesis_asl_assembly_jar is None: kinesis_jar_present = False - jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar) + jars = kafka_assembly_jar else: kinesis_jar_present = True - jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar) + jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar) existing_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") jars_args = "--jars %s" % jars @@ -1593,14 +1429,6 @@ if __name__ == "__main__": testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, StreamingListenerTests] - if are_flume_tests_enabled: - testcases.append(FlumeStreamTests) - testcases.append(FlumePollingStreamTests) - else: - sys.stderr.write( - "Skipped test_flume_stream (enable by setting environment variable %s=1" - % flume_test_environ_var) - if are_kafka_tests_enabled: testcases.append(KafkaStreamTests) else: http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 0274038..122f25b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -537,7 +537,7 @@ class StreamingContext private[streaming] ( ExecutorAllocationManager.isDynamicAllocationEnabled(conf)) { logWarning("Dynamic Allocation is enabled for this application. " + "Enabling Dynamic allocation for Spark Streaming applications can cause data loss if " + - "Write Ahead Log is not enabled for non-replayable sources like Flume. " + + "Write Ahead Log is not enabled for non-replayable sources. " + "See the programming guide for details on how to enable the Write Ahead Log.") } } http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index a59f4ef..9939686 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -30,7 +30,7 @@ import org.apache.spark.streaming.dstream.DStream /** * A Java-friendly interface to [[org.apache.spark.streaming.dstream.DStream]], the basic * abstraction in Spark Streaming that represents a continuous stream of data. - * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, + * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, * etc.) or it can be generated by transforming existing DStreams using operations such as `map`, * `window`. For operations applicable to key-value pair DStreams, see * [[org.apache.spark.streaming.api.java.JavaPairDStream]]. http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 4a4d2c5..3524337 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -40,7 +40,7 @@ import org.apache.spark.util.{CallSite, Utils} * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). - * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, + * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by * transforming existing DStreams using operations such as `map`, * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 931f015..6495c91 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -56,7 +56,6 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) /** A human-readable name of this InputDStream */ private[streaming] def name: String = { - // e.g. FlumePollingDStream -> "Flume polling stream" val newName = Utils.getFormattedClassName(this) .replaceAll("InputDStream", "Stream") .split("(?=[A-Z])") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
