Repository: spark Updated Branches: refs/heads/master 54d13bed8 -> 24587ce43
http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala new file mode 100644 index 0000000..7dc9606 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.io.{Externalizable, ObjectInput, ObjectOutput} +import java.net.InetSocketAddress +import java.nio.ByteBuffer +import java.util.concurrent.Executors + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import org.apache.avro.ipc.NettyServer +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, Status} +import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels} +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.handler.codec.compression._ + +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils + +private[streaming] +class FlumeInputDStream[T: ClassTag]( + _ssc: StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean +) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { + + override def getReceiver(): Receiver[SparkFlumeEvent] = { + new FlumeReceiver(host, port, storageLevel, enableDecompression) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event: AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.readFully(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.readFully(keyBuff) + val key: String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.readFully(valBuff) + val value: String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { + val body = event.getBody + out.writeInt(body.remaining()) + Utils.writeByteBuffer(body, out) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders.asScala) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +private[streaming] object SparkFlumeEvent { + def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +private[streaming] +class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol { + override def append(event: AvroFlumeEvent): Status = { + receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } + + override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = { + events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface. */ +private[streaming] +class FlumeReceiver( + host: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { + + lazy val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + var server: NettyServer = null + + private def initServer() = { + if (enableDecompression) { + val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), + Executors.newCachedThreadPool()) + val channelPipelineFactory = new CompressionChannelPipelineFactory() + + new NettyServer( + responder, + new InetSocketAddress(host, port), + channelFactory, + channelPipelineFactory, + null) + } else { + new NettyServer(responder, new InetSocketAddress(host, port)) + } + } + + def onStart() { + synchronized { + if (server == null) { + server = initServer() + server.start() + } else { + logWarning("Flume receiver being asked to start more then once with out close") + } + } + logInfo("Flume receiver started") + } + + def onStop() { + synchronized { + if (server != null) { + server.close() + server = null + } + } + logInfo("Flume receiver stopped") + } + + override def preferredLocation: Option[String] = Option(host) + + /** A Netty Pipeline factory that will decompress incoming data from + * and the Netty client and compress data going back to the client. + * + * The compression on the return is required because Flume requires + * a successful response to indicate it can remove the event/batch + * from the configured channel + */ + private[streaming] + class CompressionChannelPipelineFactory extends ChannelPipelineFactory { + def getPipeline(): ChannelPipeline = { + val pipeline = Channels.pipeline() + val encoder = new ZlibEncoder(6) + pipeline.addFirst("deflater", encoder) + pipeline.addFirst("inflater", new ZlibDecoder()) + pipeline + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala new file mode 100644 index 0000000..250bfc1 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume + + +import java.net.InetSocketAddress +import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory + +import org.apache.spark.internal.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.flume.sink._ +import org.apache.spark.streaming.receiver.Receiver + +/** + * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running + * [[org.apache.spark.streaming.flume.sink.SparkSink]]s. + * @param _ssc Streaming context that will execute this input stream + * @param addresses List of addresses at which SparkSinks are listening + * @param maxBatchSize Maximum size of a batch + * @param parallelism Number of parallel connections to open + * @param storageLevel The storage level to use. + * @tparam T Class type of the object of this stream + */ +private[streaming] class FlumePollingInputDStream[T: ClassTag]( + _ssc: StreamingContext, + val addresses: Seq[InetSocketAddress], + val maxBatchSize: Int, + val parallelism: Int, + storageLevel: StorageLevel + ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) { + + override def getReceiver(): Receiver[SparkFlumeEvent] = { + new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) + } +} + +private[streaming] class FlumePollingReceiver( + addresses: Seq[InetSocketAddress], + maxBatchSize: Int, + parallelism: Int, + storageLevel: StorageLevel + ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { + + lazy val channelFactoryExecutor = + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). + setNameFormat("Flume Receiver Channel Thread - %d").build()) + + lazy val channelFactory = + new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor) + + lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build()) + + private lazy val connections = new LinkedBlockingQueue[FlumeConnection]() + + override def onStart(): Unit = { + // Create the connections to each Flume agent. + addresses.foreach(host => { + val transceiver = new NettyTransceiver(host, channelFactory) + val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) + connections.add(new FlumeConnection(transceiver, client)) + }) + for (i <- 0 until parallelism) { + logInfo("Starting Flume Polling Receiver worker threads..") + // Threads that pull data from Flume. + receiverExecutor.submit(new FlumeBatchFetcher(this)) + } + } + + override def onStop(): Unit = { + logInfo("Shutting down Flume Polling Receiver") + receiverExecutor.shutdown() + // Wait upto a minute for the threads to die + if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + receiverExecutor.shutdownNow() + } + connections.asScala.foreach(_.transceiver.close()) + channelFactory.releaseExternalResources() + } + + private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = { + this.connections + } + + private[flume] def getMaxBatchSize: Int = { + this.maxBatchSize + } +} + +/** + * A wrapper around the transceiver and the Avro IPC API. + * @param transceiver The transceiver to use for communication with Flume + * @param client The client that the callbacks are received on. + */ +private[flume] class FlumeConnection(val transceiver: NettyTransceiver, + val client: SparkFlumeProtocol.Callback) + + + http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala new file mode 100644 index 0000000..945cfa7 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.net.{InetSocketAddress, ServerSocket} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.{List => JList} +import java.util.Collections + +import scala.collection.JavaConverters._ + +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.commons.lang3.RandomUtils +import org.apache.flume.source.avro +import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.socket.SocketChannel +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder} + +import org.apache.spark.util.Utils +import org.apache.spark.SparkConf + +/** + * Share codes for Scala and Python unit tests + */ +private[flume] class FlumeTestUtils { + + private var transceiver: NettyTransceiver = null + + private val testPort: Int = findFreePort() + + def getTestPort(): Int = testPort + + /** Find a free port */ + private def findFreePort(): Int = { + val candidatePort = RandomUtils.nextInt(1024, 65536) + Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + val socket = new ServerSocket(trialPort) + socket.close() + (null, trialPort) + }, new SparkConf())._2 + } + + /** Send data to the flume receiver */ + def writeInput(input: JList[String], enableCompression: Boolean): Unit = { + val testAddress = new InetSocketAddress("localhost", testPort) + + val inputEvents = input.asScala.map { item => + val event = new AvroFlumeEvent + event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8))) + event.setHeaders(Collections.singletonMap("test", "header")) + event + } + + // if last attempted transceiver had succeeded, close it + close() + + // Create transceiver + transceiver = { + if (enableCompression) { + new NettyTransceiver(testAddress, new CompressionChannelFactory(6)) + } else { + new NettyTransceiver(testAddress) + } + } + + // Create Avro client with the transceiver + val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver) + if (client == null) { + throw new AssertionError("Cannot create client") + } + + // Send data + val status = client.appendBatch(inputEvents.asJava) + if (status != avro.Status.OK) { + throw new AssertionError("Sent events unsuccessfully") + } + } + + def close(): Unit = { + if (transceiver != null) { + transceiver.close() + transceiver = null + } + } + + /** Class to create socket channel with compression */ + private class CompressionChannelFactory(compressionLevel: Int) + extends NioClientSocketChannelFactory { + + override def newChannel(pipeline: ChannelPipeline): SocketChannel = { + val encoder = new ZlibEncoder(compressionLevel) + pipeline.addFirst("deflater", encoder) + pipeline.addFirst("inflater", new ZlibDecoder()) + super.newChannel(pipeline) + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala new file mode 100644 index 0000000..3e3ed71 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.io.{ByteArrayOutputStream, DataOutputStream} +import java.net.InetSocketAddress +import java.util.{List => JList, Map => JMap} + +import scala.collection.JavaConverters._ + +import org.apache.spark.api.java.function.PairFunction +import org.apache.spark.api.python.PythonRDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +object FlumeUtils { + private val DEFAULT_POLLING_PARALLELISM = 5 + private val DEFAULT_POLLING_BATCH_SIZE = 1000 + + /** + * Create a input stream from a Flume source. + * @param ssc StreamingContext object + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream ( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[SparkFlumeEvent] = { + createStream(ssc, hostname, port, storageLevel, false) + } + + /** + * Create a input stream from a Flume source. + * @param ssc StreamingContext object + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableDecompression should netty server decompress input stream + */ + def createStream ( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ): ReceiverInputDStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream[SparkFlumeEvent]( + ssc, hostname, port, storageLevel, enableDecompression) + + inputStream + } + + /** + * Creates a input stream from a Flume source. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createStream(jssc.ssc, hostname, port) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createStream(jssc.ssc, hostname, port, storageLevel, false) + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + * @param enableDecompression should netty server decompress input stream + */ + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Address of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening + * @param storageLevel Storage level to use for storing the received objects + */ + def createPollingStream( + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 + ): ReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param addresses List of InetSocketAddresses representing the hosts to connect to. + * @param storageLevel Storage level to use for storing the received objects + */ + def createPollingStream( + ssc: StreamingContext, + addresses: Seq[InetSocketAddress], + storageLevel: StorageLevel + ): ReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(ssc, addresses, storageLevel, + DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * @param addresses List of InetSocketAddresses representing the hosts to connect to. + * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a + * single RPC call + * @param parallelism Number of concurrent requests this stream should send to the sink. Note + * that having a higher number of requests concurrently being pulled will + * result in this stream using more threads + * @param storageLevel Storage level to use for storing the received objects + */ + def createPollingStream( + ssc: StreamingContext, + addresses: Seq[InetSocketAddress], + storageLevel: StorageLevel, + maxBatchSize: Int, + parallelism: Int + ): ReceiverInputDStream[SparkFlumeEvent] = { + new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize, + parallelism, storageLevel) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Hostname of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening + */ + def createPollingStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Hostname of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening + * @param storageLevel Storage level to use for storing the received objects + */ + def createPollingStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param addresses List of InetSocketAddresses on which the Spark Sink is running. + * @param storageLevel Storage level to use for storing the received objects + */ + def createPollingStream( + jssc: JavaStreamingContext, + addresses: Array[InetSocketAddress], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc, addresses, storageLevel, + DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * @param addresses List of InetSocketAddresses on which the Spark Sink is running + * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a + * single RPC call + * @param parallelism Number of concurrent requests this stream should send to the sink. Note + * that having a higher number of requests concurrently being pulled will + * result in this stream using more threads + * @param storageLevel Storage level to use for storing the received objects + */ + def createPollingStream( + jssc: JavaStreamingContext, + addresses: Array[InetSocketAddress], + storageLevel: StorageLevel, + maxBatchSize: Int, + parallelism: Int + ): JavaReceiverInputDStream[SparkFlumeEvent] = { + createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) + } +} + +/** + * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and + * function so that it can be easily instantiated and called from Python's FlumeUtils. + */ +private[flume] class FlumeUtilsPythonHelper { + + def createStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel, + enableDecompression: Boolean + ): JavaPairDStream[Array[Byte], Array[Byte]] = { + val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression) + FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream) + } + + def createPollingStream( + jssc: JavaStreamingContext, + hosts: JList[String], + ports: JList[Int], + storageLevel: StorageLevel, + maxBatchSize: Int, + parallelism: Int + ): JavaPairDStream[Array[Byte], Array[Byte]] = { + assert(hosts.size() == ports.size()) + val addresses = hosts.asScala.zip(ports.asScala).map { + case (host, port) => new InetSocketAddress(host, port) + } + val dstream = FlumeUtils.createPollingStream( + jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) + FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream) + } + +} + +private object FlumeUtilsPythonHelper { + + private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = { + val byteStream = new ByteArrayOutputStream() + val output = new DataOutputStream(byteStream) + try { + output.writeInt(map.size) + map.asScala.foreach { kv => + PythonRDD.writeUTF(kv._1.toString, output) + PythonRDD.writeUTF(kv._2.toString, output) + } + byteStream.toByteArray + } + finally { + output.close() + } + } + + private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]): + JavaPairDStream[Array[Byte], Array[Byte]] = { + dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] { + override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = { + val event = sparkEvent.event + val byteBuffer = event.getBody + val body = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(body) + (stringMapToByteArray(event.getHeaders), body) + } + }) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala new file mode 100644 index 0000000..1a96df6 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.nio.charset.StandardCharsets +import java.util.{Collections, List => JList, Map => JMap} +import java.util.concurrent._ + +import scala.collection.mutable.ArrayBuffer + +import org.apache.flume.event.EventBuilder +import org.apache.flume.Context +import org.apache.flume.channel.MemoryChannel +import org.apache.flume.conf.Configurables + +import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig} + +/** + * Share codes for Scala and Python unit tests + */ +private[flume] class PollingFlumeTestUtils { + + private val batchCount = 5 + val eventsPerBatch = 100 + private val totalEventsPerChannel = batchCount * eventsPerBatch + private val channelCapacity = 5000 + + def getTotalEvents: Int = totalEventsPerChannel * channels.size + + private val channels = new ArrayBuffer[MemoryChannel] + private val sinks = new ArrayBuffer[SparkSink] + + /** + * Start a sink and return the port of this sink + */ + def startSingleSink(): Int = { + channels.clear() + sinks.clear() + + // Start the channel and sink. + val context = new Context() + context.put("capacity", channelCapacity.toString) + context.put("transactionCapacity", "1000") + context.put("keep-alive", "0") + val channel = new MemoryChannel() + Configurables.configure(channel, context) + + val sink = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0)) + Configurables.configure(sink, context) + sink.setChannel(channel) + sink.start() + + channels += (channel) + sinks += sink + + sink.getPort() + } + + /** + * Start 2 sinks and return the ports + */ + def startMultipleSinks(): Seq[Int] = { + channels.clear() + sinks.clear() + + // Start the channel and sink. + val context = new Context() + context.put("capacity", channelCapacity.toString) + context.put("transactionCapacity", "1000") + context.put("keep-alive", "0") + val channel = new MemoryChannel() + Configurables.configure(channel, context) + + val channel2 = new MemoryChannel() + Configurables.configure(channel2, context) + + val sink = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0)) + Configurables.configure(sink, context) + sink.setChannel(channel) + sink.start() + + val sink2 = new SparkSink() + context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost") + context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0)) + Configurables.configure(sink2, context) + sink2.setChannel(channel2) + sink2.start() + + sinks += sink + sinks += sink2 + channels += channel + channels += channel2 + + sinks.map(_.getPort()) + } + + /** + * Send data and wait until all data has been received + */ + def sendDatAndEnsureAllDataHasBeenReceived(): Unit = { + val executor = Executors.newCachedThreadPool() + val executorCompletion = new ExecutorCompletionService[Void](executor) + + val latch = new CountDownLatch(batchCount * channels.size) + sinks.foreach(_.countdownWhenBatchReceived(latch)) + + channels.foreach(channel => { + executorCompletion.submit(new TxnSubmitter(channel)) + }) + + for (i <- 0 until channels.size) { + executorCompletion.take() + } + + latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received. + } + + /** + * A Python-friendly method to assert the output + */ + def assertOutput( + outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = { + require(outputHeaders.size == outputBodies.size) + val eventSize = outputHeaders.size + if (eventSize != totalEventsPerChannel * channels.size) { + throw new AssertionError( + s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize") + } + var counter = 0 + for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) { + val eventBodyToVerify = s"${channels(k).getName}-$i" + val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header") + var found = false + var j = 0 + while (j < eventSize && !found) { + if (eventBodyToVerify == outputBodies.get(j) && + eventHeaderToVerify == outputHeaders.get(j)) { + found = true + counter += 1 + } + j += 1 + } + } + if (counter != totalEventsPerChannel * channels.size) { + throw new AssertionError( + s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter") + } + } + + def assertChannelsAreEmpty(): Unit = { + channels.foreach(assertChannelIsEmpty) + } + + private def assertChannelIsEmpty(channel: MemoryChannel): Unit = { + val queueRemaining = channel.getClass.getDeclaredField("queueRemaining") + queueRemaining.setAccessible(true) + val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits") + if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) { + throw new AssertionError(s"Channel ${channel.getName} is not empty") + } + } + + def close(): Unit = { + sinks.foreach(_.stop()) + sinks.clear() + channels.foreach(_.stop()) + channels.clear() + } + + private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] { + override def call(): Void = { + var t = 0 + for (i <- 0 until batchCount) { + val tx = channel.getTransaction + tx.begin() + for (j <- 0 until eventsPerBatch) { + channel.put(EventBuilder.withBody( + s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8), + Collections.singletonMap(s"test-$t", "header"))) + t += 1 + } + tx.commit() + tx.close() + Thread.sleep(500) // Allow some time for the events to reach + } + null + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java new file mode 100644 index 0000000..d31aa5f --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Spark streaming receiver for Flume. + */ +package org.apache.spark.streaming.flume; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala new file mode 100644 index 0000000..9bfab68 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +/** + * Spark streaming receiver for Flume. + */ +package object flume http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java ---------------------------------------------------------------------- diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000..cfedb5a --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + SparkConf conf = new SparkConf() + .setMaster("local[2]") + .setAppName("test") + .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); + ssc = new JavaStreamingContext(conf, new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java new file mode 100644 index 0000000..79c5b91 --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume; + +import java.net.InetSocketAddress; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; + +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.junit.Test; + +public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + // tests the API, does not actually test data receiving + InetSocketAddress[] addresses = new InetSocketAddress[] { + new InetSocketAddress("localhost", 12345) + }; + JavaReceiverInputDStream<SparkFlumeEvent> test1 = + FlumeUtils.createPollingStream(ssc, "localhost", 12345); + JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream( + ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream( + ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream( + ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java new file mode 100644 index 0000000..3b5e0c7 --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; + +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.junit.Test; + +public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + // tests the API, does not actually test data receiving + JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); + JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345, + StorageLevel.MEMORY_AND_DISK_SER_2(), false); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties new file mode 100644 index 0000000..75e3b53 --- /dev/null +++ b/external/flume/src/test/resources/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.spark-project.jetty=WARN + http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala new file mode 100644 index 0000000..c97a27c --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.reflect.ClassTag + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} +import org.apache.spark.util.Utils + +/** + * This is a output stream just for the testsuites. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of items + */ +class TestOutputStream[T: ClassTag](parent: DStream[T], + val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]()) + extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + val collected = rdd.collect() + output.add(collected) + }, false) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { + ois.defaultReadObject() + output.clear() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala new file mode 100644 index 0000000..1567124 --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream} +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.util.{ManualClock, Utils} + +class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging { + + val maxAttempts = 5 + val batchDuration = Seconds(1) + + val conf = new SparkConf() + .setMaster("local[2]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.clock", "org.apache.spark.util.ManualClock") + + val utils = new PollingFlumeTestUtils + + test("flume polling test") { + testMultipleTimes(testFlumePolling) + } + + test("flume polling test multiple hosts") { + testMultipleTimes(testFlumePollingMultipleHost) + } + + /** + * Run the given test until no more java.net.BindException's are thrown. + * Do this only up to a certain attempt limit. + */ + private def testMultipleTimes(test: () => Unit): Unit = { + var testPassed = false + var attempt = 0 + while (!testPassed && attempt < maxAttempts) { + try { + test() + testPassed = true + } catch { + case e: Exception if Utils.isBindCollision(e) => + logWarning("Exception when running flume polling test: " + e) + attempt += 1 + } + } + assert(testPassed, s"Test failed after $attempt attempts!") + } + + private def testFlumePolling(): Unit = { + try { + val port = utils.startSingleSink() + + writeAndVerify(Seq(port)) + utils.assertChannelsAreEmpty() + } finally { + utils.close() + } + } + + private def testFlumePollingMultipleHost(): Unit = { + try { + val ports = utils.startMultipleSinks() + writeAndVerify(ports) + utils.assertChannelsAreEmpty() + } finally { + utils.close() + } + } + + def writeAndVerify(sinkPorts: Seq[Int]): Unit = { + // Set up the streaming context and input streams + val ssc = new StreamingContext(conf, batchDuration) + val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port)) + val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = + FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, + utils.eventsPerBatch, 5) + val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputQueue) + outputStream.register() + + ssc.start() + try { + utils.sendDatAndEnsureAllDataHasBeenReceived() + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + clock.advance(batchDuration.milliseconds) + + // The eventually is required to ensure that all data in the batch has been processed. + eventually(timeout(10 seconds), interval(100 milliseconds)) { + val flattenOutput = outputQueue.asScala.toSeq.flatten + val headers = flattenOutput.map(_.event.getHeaders.asScala.map { + case (key, value) => (key.toString, value.toString) + }).map(_.asJava) + val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody)) + utils.assertOutput(headers.asJava, bodies.asJava) + } + } finally { + ssc.stop() + } + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala new file mode 100644 index 0000000..7bac1cc --- /dev/null +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.flume + +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.jboss.netty.channel.ChannelPipeline +import org.jboss.netty.channel.socket.SocketChannel +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.handler.codec.compression._ +import org.scalatest.{BeforeAndAfter, Matchers} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream} + +class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { + val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite") + var ssc: StreamingContext = null + + test("flume input stream") { + testFlumeStream(testCompression = false) + } + + test("flume input compressed stream") { + testFlumeStream(testCompression = true) + } + + /** Run test on flume stream */ + private def testFlumeStream(testCompression: Boolean): Unit = { + val input = (1 to 100).map { _.toString } + val utils = new FlumeTestUtils + try { + val outputQueue = startContext(utils.getTestPort(), testCompression) + + eventually(timeout(10 seconds), interval(100 milliseconds)) { + utils.writeInput(input.asJava, testCompression) + } + + eventually(timeout(10 seconds), interval(100 milliseconds)) { + val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event } + outputEvents.foreach { + event => + event.getHeaders.get("test") should be("header") + } + val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody)) + output should be (input) + } + } finally { + if (ssc != null) { + ssc.stop() + } + utils.close() + } + } + + /** Setup and start the streaming context */ + private def startContext( + testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = { + ssc = new StreamingContext(conf, Milliseconds(200)) + val flumeStream = FlumeUtils.createStream( + ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression) + val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputQueue) + outputStream.register() + ssc.start() + outputQueue + } + + /** Class to create socket channel with compression */ + private class CompressionChannelFactory(compressionLevel: Int) + extends NioClientSocketChannelFactory { + + override def newChannel(pipeline: ChannelPipeline): SocketChannel = { + val encoder = new ZlibEncoder(compressionLevel) + pipeline.addFirst("deflater", encoder) + pipeline.addFirst("inflater", new ZlibDecoder()) + super.newChannel(pipeline) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 92a32e7..b4cfa3a 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,9 @@ <module>sql/hive</module> <module>external/docker-integration-tests</module> <module>assembly</module> + <module>external/flume</module> + <module>external/flume-sink</module> + <module>external/flume-assembly</module> <module>examples</module> <module>repl</module> <module>launcher</module> @@ -123,6 +126,7 @@ <yarn.version>${hadoop.version}</yarn.version> <hbase.version>0.98.17-hadoop2</hbase.version> <hbase.artifact>hbase</hbase.artifact> + <flume.version>1.6.0</flume.version> <zookeeper.version>3.4.5</zookeeper.version> <curator.version>2.4.0</curator.version> <hive.group>org.spark-project.hive</hive.group> @@ -189,6 +193,7 @@ during compilation if the dependency is transivite (e.g. "graphx/" depending on "core/" and needing Hadoop classes in the classpath to compile). --> + <flume.deps.scope>compile</flume.deps.scope> <hadoop.deps.scope>compile</hadoop.deps.scope> <hbase.deps.scope>compile</hbase.deps.scope> <hive.deps.scope>compile</hive.deps.scope> @@ -1591,6 +1596,46 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume.version}</version> + <scope>${flume.deps.scope}</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-sdk</artifactId> + <version>${flume.version}</version> + <scope>${flume.deps.scope}</scope> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <version>${calcite.version}</version> @@ -2442,6 +2487,9 @@ that does not have them. --> <profile> + <id>flume-provided</id> + </profile> + <profile> <id>hadoop-provided</id> </profile> <profile> http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index dbe98d1..fb229b9 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -39,9 +39,9 @@ object BuildCommons { ).map(ProjectRef(buildLocation, _)) val streamingProjects@Seq( - streaming, streamingKafka + streaming, streamingFlumeSink, streamingFlume, streamingKafka ) = Seq( - "streaming", "streaming-kafka" + "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka" ).map(ProjectRef(buildLocation, _)) val allProjects@Seq( @@ -56,8 +56,8 @@ object BuildCommons { Seq("yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests").map(ProjectRef(buildLocation, _)) - val assemblyProjects@Seq(assembly, networkYarn, streamingKafkaAssembly, streamingKinesisAslAssembly) = - Seq("assembly", "network-yarn", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly") + val assemblyProjects@Seq(assembly, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKinesisAslAssembly) = + Seq("assembly", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-kinesis-asl-assembly") .map(ProjectRef(buildLocation, _)) val copyJarsProjects@Seq(examples) = Seq("examples").map(ProjectRef(buildLocation, _)) @@ -283,6 +283,8 @@ object SparkBuild extends PomBuild { /* Hive console settings */ enable(Hive.settings)(hive) + enable(Flume.settings)(streamingFlumeSink) + enable(Java8TestSettings.settings)(java8Tests) enable(DockerIntegrationTests.settings)(dockerIntegrationTests) @@ -348,6 +350,10 @@ object Unsafe { ) } +object Flume { + lazy val settings = sbtavro.SbtAvro.avroSettings +} + object DockerIntegrationTests { // This serves to override the override specified in DependencyOverrides: lazy val settings = Seq( @@ -526,7 +532,7 @@ object Assembly { .getOrElse(SbtPomKeys.effectivePom.value.getProperties.get("hadoop.version").asInstanceOf[String]) }, jarName in assembly <<= (version, moduleName, hadoopVersion) map { (v, mName, hv) => - if (mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) { + if (mName.contains("streaming-flume-assembly") || mName.contains("streaming-kafka-assembly") || mName.contains("streaming-kinesis-asl-assembly")) { // This must match the same name used in maven (see external/kafka-assembly/pom.xml) s"${mName}-${v}.jar" } else { @@ -644,9 +650,9 @@ object Unidoc { publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, yarn, testTags), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, testTags), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, yarn, testTags), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, testTags), // Skip actual catalyst, but include the subproject. // Catalyst is not public API and contains quasiquotes which break scaladoc. @@ -665,7 +671,7 @@ object Unidoc { "-public", "-group", "Core Java API", packageList("api.java", "api.java.function"), "-group", "Spark Streaming", packageList( - "streaming.api.java", "streaming.kafka", "streaming.kinesis" + "streaming.api.java", "streaming.flume", "streaming.kafka", "streaming.kinesis" ), "-group", "MLlib", packageList( "mllib.classification", "mllib.clustering", "mllib.evaluation.binary", "mllib.linalg", http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/python/pyspark/streaming/flume.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py new file mode 100644 index 0000000..cd30483 --- /dev/null +++ b/python/pyspark/streaming/flume.py @@ -0,0 +1,140 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +if sys.version >= "3": + from io import BytesIO +else: + from StringIO import StringIO +from py4j.protocol import Py4JJavaError + +from pyspark.storagelevel import StorageLevel +from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int +from pyspark.streaming import DStream + +__all__ = ['FlumeUtils', 'utf8_decoder'] + + +def utf8_decoder(s): + """ Decode the unicode as UTF-8 """ + if s is None: + return None + return s.decode('utf-8') + + +class FlumeUtils(object): + + @staticmethod + def createStream(ssc, hostname, port, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, + enableDecompression=False, + bodyDecoder=utf8_decoder): + """ + Create an input stream that pulls events from Flume. + + :param ssc: StreamingContext object + :param hostname: Hostname of the slave machine to which the flume data will be sent + :param port: Port of the slave machine to which the flume data will be sent + :param storageLevel: Storage level to use for storing the received objects + :param enableDecompression: Should netty server decompress input stream + :param bodyDecoder: A function used to decode body (default is utf8_decoder) + :return: A DStream object + """ + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + helper = FlumeUtils._get_helper(ssc._sc) + jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression) + return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder) + + @staticmethod + def createPollingStream(ssc, addresses, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, + maxBatchSize=1000, + parallelism=5, + bodyDecoder=utf8_decoder): + """ + Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + This stream will poll the sink for data and will pull events as they are available. + + :param ssc: StreamingContext object + :param addresses: List of (host, port)s on which the Spark Sink is running. + :param storageLevel: Storage level to use for storing the received objects + :param maxBatchSize: The maximum number of events to be pulled from the Spark sink + in a single RPC call + :param parallelism: Number of concurrent requests this stream should send to the sink. + Note that having a higher number of requests concurrently being pulled + will result in this stream using more threads + :param bodyDecoder: A function used to decode body (default is utf8_decoder) + :return: A DStream object + """ + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + hosts = [] + ports = [] + for (host, port) in addresses: + hosts.append(host) + ports.append(port) + helper = FlumeUtils._get_helper(ssc._sc) + jstream = helper.createPollingStream( + ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism) + return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder) + + @staticmethod + def _toPythonDStream(ssc, jstream, bodyDecoder): + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) + stream = DStream(jstream, ssc, ser) + + def func(event): + headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0]) + headers = {} + strSer = UTF8Deserializer() + for i in range(0, read_int(headersBytes)): + key = strSer.loads(headersBytes) + value = strSer.loads(headersBytes) + headers[key] = value + body = bodyDecoder(event[1]) + return (headers, body) + return stream.map(func) + + @staticmethod + def _get_helper(sc): + try: + return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": + FlumeUtils._printErrorMsg(sc) + raise + + @staticmethod + def _printErrorMsg(sc): + print(""" +________________________________________________________________________________________________ + + Spark Streaming's Flume libraries not found in class path. Try one of the following. + + 1. Include the Flume library and its dependencies with in the + spark-submit command as + + $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ... + + 2. Download the JAR of the artifact from Maven Central http://search.maven.org/, + Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s. + Then, include the jar in the spark-submit command as + + $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ... + +________________________________________________________________________________________________ + +""" % (sc.version, sc.version)) http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index eb4696c..d010c0e 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -45,6 +45,7 @@ from pyspark.context import SparkConf, SparkContext, RDD from pyspark.storagelevel import StorageLevel from pyspark.streaming.context import StreamingContext from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition +from pyspark.streaming.flume import FlumeUtils from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream from pyspark.streaming.listener import StreamingListener @@ -1260,6 +1261,148 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream) +class FlumeStreamTests(PySparkStreamingTestCase): + timeout = 20 # seconds + duration = 1 + + def setUp(self): + super(FlumeStreamTests, self).setUp() + self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils() + + def tearDown(self): + if self._utils is not None: + self._utils.close() + self._utils = None + + super(FlumeStreamTests, self).tearDown() + + def _startContext(self, n, compressed): + # Start the StreamingContext and also collect the result + dstream = FlumeUtils.createStream(self.ssc, "localhost", self._utils.getTestPort(), + enableDecompression=compressed) + result = [] + + def get_output(_, rdd): + for event in rdd.collect(): + if len(result) < n: + result.append(event) + dstream.foreachRDD(get_output) + self.ssc.start() + return result + + def _validateResult(self, input, result): + # Validate both the header and the body + header = {"test": "header"} + self.assertEqual(len(input), len(result)) + for i in range(0, len(input)): + self.assertEqual(header, result[i][0]) + self.assertEqual(input[i], result[i][1]) + + def _writeInput(self, input, compressed): + # Try to write input to the receiver until success or timeout + start_time = time.time() + while True: + try: + self._utils.writeInput(input, compressed) + break + except: + if time.time() - start_time < self.timeout: + time.sleep(0.01) + else: + raise + + def test_flume_stream(self): + input = [str(i) for i in range(1, 101)] + result = self._startContext(len(input), False) + self._writeInput(input, False) + self.wait_for(result, len(input)) + self._validateResult(input, result) + + def test_compressed_flume_stream(self): + input = [str(i) for i in range(1, 101)] + result = self._startContext(len(input), True) + self._writeInput(input, True) + self.wait_for(result, len(input)) + self._validateResult(input, result) + + +class FlumePollingStreamTests(PySparkStreamingTestCase): + timeout = 20 # seconds + duration = 1 + maxAttempts = 5 + + def setUp(self): + self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils() + + def tearDown(self): + if self._utils is not None: + self._utils.close() + self._utils = None + + def _writeAndVerify(self, ports): + # Set up the streaming context and input streams + ssc = StreamingContext(self.sc, self.duration) + try: + addresses = [("localhost", port) for port in ports] + dstream = FlumeUtils.createPollingStream( + ssc, + addresses, + maxBatchSize=self._utils.eventsPerBatch(), + parallelism=5) + outputBuffer = [] + + def get_output(_, rdd): + for e in rdd.collect(): + outputBuffer.append(e) + + dstream.foreachRDD(get_output) + ssc.start() + self._utils.sendDatAndEnsureAllDataHasBeenReceived() + + self.wait_for(outputBuffer, self._utils.getTotalEvents()) + outputHeaders = [event[0] for event in outputBuffer] + outputBodies = [event[1] for event in outputBuffer] + self._utils.assertOutput(outputHeaders, outputBodies) + finally: + ssc.stop(False) + + def _testMultipleTimes(self, f): + attempt = 0 + while True: + try: + f() + break + except: + attempt += 1 + if attempt >= self.maxAttempts: + raise + else: + import traceback + traceback.print_exc() + + def _testFlumePolling(self): + try: + port = self._utils.startSingleSink() + self._writeAndVerify([port]) + self._utils.assertChannelsAreEmpty() + finally: + self._utils.close() + + def _testFlumePollingMultipleHosts(self): + try: + port = self._utils.startSingleSink() + self._writeAndVerify([port]) + self._utils.assertChannelsAreEmpty() + finally: + self._utils.close() + + def test_flume_polling(self): + self._testMultipleTimes(self._testFlumePolling) + + def test_flume_polling_multiple_hosts(self): + self._testMultipleTimes(self._testFlumePollingMultipleHosts) + + class KinesisStreamTests(PySparkStreamingTestCase): def test_kinesis_stream_api(self): @@ -1348,6 +1491,23 @@ def search_kafka_assembly_jar(): return jars[0] +def search_flume_assembly_jar(): + SPARK_HOME = os.environ["SPARK_HOME"] + flume_assembly_dir = os.path.join(SPARK_HOME, "external/flume-assembly") + jars = search_jar(flume_assembly_dir, "spark-streaming-flume-assembly") + if not jars: + raise Exception( + ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) + + "You need to build Spark with " + "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or " + "'build/mvn package' before running this test.") + elif len(jars) > 1: + raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please " + "remove all but one") % (", ".join(jars))) + else: + return jars[0] + + def search_kinesis_asl_assembly_jar(): SPARK_HOME = os.environ["SPARK_HOME"] kinesis_asl_assembly_dir = os.path.join(SPARK_HOME, "external/kinesis-asl-assembly") @@ -1368,18 +1528,20 @@ are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1' if __name__ == "__main__": from pyspark.streaming.tests import * kafka_assembly_jar = search_kafka_assembly_jar() + flume_assembly_jar = search_flume_assembly_jar() kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() if kinesis_asl_assembly_jar is None: kinesis_jar_present = False - jars = kafka_assembly_jar + jars = "%s,%s" % (kafka_assembly_jar, flume_assembly_jar) else: kinesis_jar_present = True - jars = "%s,%s" % (kafka_assembly_jar, kinesis_asl_assembly_jar) + jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar) os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, - KafkaStreamTests, StreamingListenerTests] + KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, + StreamingListenerTests] if kinesis_jar_present is True: testcases.append(KinesisStreamTests) http://git-wip-us.apache.org/repos/asf/spark/blob/24587ce4/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 10ee7d5..1eb680d 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -18,6 +18,8 @@ package test.org.apache.spark.sql; import java.io.Serializable; +import java.net.URISyntaxException; +import java.net.URL; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -303,27 +305,42 @@ public class JavaDataFrameSuite { Assert.assertEquals(30000.0, actual.get(1).getDouble(2), 0.01); } + private String getResource(String resource) { + try { + // The following "getResource" has different behaviors in SBT and Maven. + // When running in Jenkins, the file path may contain "@" when there are multiple + // SparkPullRequestBuilders running in the same worker + // (e.g., /home/jenkins/workspace/SparkPullRequestBuilder@2) + // When running in SBT, "@" in the file path will be returned as "@", however, + // when running in Maven, "@" will be encoded as "%40". + // Therefore, we convert it to URI then call "getPath" to decode it back so that it can both + // work both in SBT and Maven. + URL url = Thread.currentThread().getContextClassLoader().getResource(resource); + return url.toURI().getPath(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + @Test public void testGenericLoad() { - Dataset<Row> df1 = context.read().format("text").load( - Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString()); + Dataset<Row> df1 = context.read().format("text").load(getResource("text-suite.txt")); Assert.assertEquals(4L, df1.count()); Dataset<Row> df2 = context.read().format("text").load( - Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(), - Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString()); + getResource("text-suite.txt"), + getResource("text-suite2.txt")); Assert.assertEquals(5L, df2.count()); } @Test public void testTextLoad() { - Dataset<String> ds1 = context.read().text( - Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString()); + Dataset<String> ds1 = context.read().text(getResource("text-suite.txt")); Assert.assertEquals(4L, ds1.count()); Dataset<String> ds2 = context.read().text( - Thread.currentThread().getContextClassLoader().getResource("text-suite.txt").toString(), - Thread.currentThread().getContextClassLoader().getResource("text-suite2.txt").toString()); + getResource("text-suite.txt"), + getResource("text-suite2.txt")); Assert.assertEquals(5L, ds2.count()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
