http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
 
b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
deleted file mode 100644
index e8ca1e7..0000000
--- 
a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume.sink
-
-import java.net.InetSocketAddress
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.JavaConverters._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.event.EventBuilder
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-
-// Due to MNG-1378, there is not a way to include test dependencies 
transitively.
-// We cannot include Spark core tests as a dependency here because it depends 
on
-// Spark core main, which has too many dependencies to require here manually.
-// For this reason, we continue to use FunSuite and ignore the scalastyle 
checks
-// that fail if this is detected.
-// scalastyle:off
-import org.scalatest.FunSuite
-
-class SparkSinkSuite extends FunSuite {
-// scalastyle:on
-
-  val eventsPerBatch = 1000
-  val channelCapacity = 5000
-
-  test("Success with ack") {
-    val (channel, sink, latch) = initializeChannelAndSink()
-    channel.start()
-    sink.start()
-
-    putEvents(channel, eventsPerBatch)
-
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-
-    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
-    val events = client.getEventBatch(1000)
-    client.ack(events.getSequenceNumber)
-    assert(events.getEvents.size() === 1000)
-    latch.await(1, TimeUnit.SECONDS)
-    assertChannelIsEmpty(channel)
-    sink.stop()
-    channel.stop()
-    transceiver.close()
-  }
-
-  test("Failure with nack") {
-    val (channel, sink, latch) = initializeChannelAndSink()
-    channel.start()
-    sink.start()
-    putEvents(channel, eventsPerBatch)
-
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-
-    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
-    val events = client.getEventBatch(1000)
-    assert(events.getEvents.size() === 1000)
-    client.nack(events.getSequenceNumber)
-    latch.await(1, TimeUnit.SECONDS)
-    assert(availableChannelSlots(channel) === 4000)
-    sink.stop()
-    channel.stop()
-    transceiver.close()
-  }
-
-  test("Failure with timeout") {
-    val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig
-      .CONF_TRANSACTION_TIMEOUT -> 1.toString))
-    channel.start()
-    sink.start()
-    putEvents(channel, eventsPerBatch)
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-
-    val (transceiver, client) = getTransceiverAndClient(address, 1)(0)
-    val events = client.getEventBatch(1000)
-    assert(events.getEvents.size() === 1000)
-    latch.await(1, TimeUnit.SECONDS)
-    assert(availableChannelSlots(channel) === 4000)
-    sink.stop()
-    channel.stop()
-    transceiver.close()
-  }
-
-  test("Multiple consumers") {
-    testMultipleConsumers(failSome = false)
-  }
-
-  test("Multiple consumers with some failures") {
-    testMultipleConsumers(failSome = true)
-  }
-
-  def testMultipleConsumers(failSome: Boolean): Unit = {
-    implicit val executorContext = ExecutionContext
-      .fromExecutorService(Executors.newFixedThreadPool(5))
-    val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5)
-    channel.start()
-    sink.start()
-    (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch))
-    val port = sink.getPort
-    val address = new InetSocketAddress("0.0.0.0", port)
-    val transceiversAndClients = getTransceiverAndClient(address, 5)
-    val batchCounter = new CountDownLatch(5)
-    val counter = new AtomicInteger(0)
-    transceiversAndClients.foreach(x => {
-      Future {
-        val client = x._2
-        val events = client.getEventBatch(1000)
-        if (!failSome || counter.getAndIncrement() % 2 == 0) {
-          client.ack(events.getSequenceNumber)
-        } else {
-          client.nack(events.getSequenceNumber)
-          throw new RuntimeException("Sending NACK for failure!")
-        }
-        events
-      }.onComplete {
-        case Success(events) =>
-          assert(events.getEvents.size() === 1000)
-          batchCounter.countDown()
-        case Failure(t) =>
-          // Don't re-throw the exception, causes a nasty unnecessary stack 
trace on stdout
-          batchCounter.countDown()
-      }
-    })
-    batchCounter.await()
-    latch.await(1, TimeUnit.SECONDS)
-    executorContext.shutdown()
-    if(failSome) {
-      assert(availableChannelSlots(channel) === 3000)
-    } else {
-      assertChannelIsEmpty(channel)
-    }
-    sink.stop()
-    channel.stop()
-    transceiversAndClients.foreach(x => x._1.close())
-  }
-
-  private def initializeChannelAndSink(overrides: Map[String, String] = 
Map.empty,
-    batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = {
-    val channel = new MemoryChannel()
-    val channelContext = new Context()
-
-    channelContext.put("capacity", channelCapacity.toString)
-    channelContext.put("transactionCapacity", 1000.toString)
-    channelContext.put("keep-alive", 0.toString)
-    channelContext.putAll(overrides.asJava)
-    channel.setName(scala.util.Random.nextString(10))
-    channel.configure(channelContext)
-
-    val sink = new SparkSink()
-    val sinkContext = new Context()
-    sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0")
-    sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString)
-    sink.configure(sinkContext)
-    sink.setChannel(channel)
-    val latch = new CountDownLatch(batchCounter)
-    sink.countdownWhenBatchReceived(latch)
-    (channel, sink, latch)
-  }
-
-  private def putEvents(ch: MemoryChannel, count: Int): Unit = {
-    val tx = ch.getTransaction
-    tx.begin()
-    (1 to count).foreach(x =>
-      
ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8))))
-    tx.commit()
-    tx.close()
-  }
-
-  private def getTransceiverAndClient(address: InetSocketAddress,
-    count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = {
-
-    (1 to count).map(_ => {
-      lazy val channelFactoryExecutor = Executors.newCachedThreadPool(
-        new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d"))
-      lazy val channelFactory =
-        new NioClientSocketChannelFactory(channelFactoryExecutor, 
channelFactoryExecutor)
-      val transceiver = new NettyTransceiver(address, channelFactory)
-      val client = 
SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
-      (transceiver, client)
-    })
-  }
-
-  private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
-    assert(availableChannelSlots(channel) === channelCapacity)
-  }
-
-  private def availableChannelSlots(channel: MemoryChannel): Int = {
-    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
-    queueRemaining.setAccessible(true)
-    val m = 
queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
-    m.invoke(queueRemaining.get(channel)).asInstanceOf[Int]
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/pom.xml
----------------------------------------------------------------------
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
deleted file mode 100644
index 1410ef7..0000000
--- a/external/flume/pom.xml
+++ /dev/null
@@ -1,89 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>3.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>spark-streaming-flume_2.11</artifactId>
-  <properties>
-    <sbt.project.name>streaming-flume</sbt.project.name>
-  </properties>
-  <packaging>jar</packaging>
-  <name>Spark Project External Flume</name>
-  <url>http://spark.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      
<artifactId>spark-streaming-flume-sink_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.flume</groupId>
-      <artifactId>flume-ng-sdk</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.scalacheck</groupId>
-      <artifactId>scalacheck_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-tags_${scala.binary.version}</artifactId>
-    </dependency>
-
-    <!--
-      This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
-      them will yield errors.
-    -->
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-tags_${scala.binary.version}</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-  <build>
-    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
 
b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
deleted file mode 100644
index 4e3420d..0000000
--- 
a/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.FlumeUtils;
-import org.apache.spark.streaming.flume.SparkFlumeEvent;
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with an AvroSink in Flume. It will start
- *  an Avro server on at the request host:port address and listen for requests.
- *  Your Flume AvroSink should be pointed to this address.
- *
- *  Usage: JavaFlumeEventCount <host> <port>
- *    <host> is the host the Flume receiver will be started on - a receiver
- *           creates a server and listens for flume events.
- *    <port> is the port the Flume receiver will listen on.
- *
- *  To run this example:
- *     `$ bin/run-example 
org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
- */
-public final class JavaFlumeEventCount {
-  private JavaFlumeEventCount() {
-  }
-
-  public static void main(String[] args) throws Exception {
-    if (args.length != 2) {
-      System.err.println("Usage: JavaFlumeEventCount <host> <port>");
-      System.exit(1);
-    }
-
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-
-    Duration batchInterval = new Duration(2000);
-    SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
-    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
-    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
-      FlumeUtils.createStream(ssc, host, port);
-
-    flumeStream.count();
-
-    flumeStream.count().map(in -> "Received " + in + " flume events.").print();
-
-    ssc.start();
-    ssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala 
b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
deleted file mode 100644
index f877f79..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with an AvroSink in Flume. It will start
- *  an Avro server on at the request host:port address and listen for requests.
- *  Your Flume AvroSink should be pointed to this address.
- *
- *  Usage: FlumeEventCount <host> <port>
- *    <host> is the host the Flume receiver will be started on - a receiver
- *           creates a server and listens for flume events.
- *    <port> is the port the Flume receiver will listen on.
- *
- *  To run this example:
- *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount 
<host> <port> `
- */
-object FlumeEventCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: FlumeEventCount <host> <port>")
-      System.exit(1)
-    }
-
-    val Array(host, IntParam(port)) = args
-
-    val batchInterval = Milliseconds(2000)
-
-    // Create the context and set the batch size
-    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
-    val ssc = new StreamingContext(sparkConf, batchInterval)
-
-    // Create a flume stream
-    val stream = FlumeUtils.createStream(ssc, host, port, 
StorageLevel.MEMORY_ONLY_SER_2)
-
-    // Print out the count of events received from this server in each batch
-    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
 
b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
deleted file mode 100644
index 79a4027..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples.streaming
-
-import org.apache.spark.SparkConf
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.flume._
-import org.apache.spark.util.IntParam
-
-/**
- *  Produces a count of events received from Flume.
- *
- *  This should be used in conjunction with the Spark Sink running in a Flume 
agent. See
- *  the Spark Streaming programming guide for more details.
- *
- *  Usage: FlumePollingEventCount <host> <port>
- *    `host` is the host on which the Spark Sink is running.
- *    `port` is the port at which the Spark Sink is listening.
- *
- *  To run this example:
- *    `$ bin/run-example 
org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
- */
-object FlumePollingEventCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(
-        "Usage: FlumePollingEventCount <host> <port>")
-      System.exit(1)
-    }
-
-    val Array(host, IntParam(port)) = args
-
-    val batchInterval = Milliseconds(2000)
-
-    // Create the context and set the batch size
-    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
-    val ssc = new StreamingContext(sparkConf, batchInterval)
-
-    // Create a flume stream that polls the Spark Sink running in a Flume agent
-    val stream = FlumeUtils.createPollingStream(ssc, host, port)
-
-    // Print out the count of events received from this server in each batch
-    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
deleted file mode 100644
index 07c5286..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ObjectInput, ObjectOutput}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A simple object that provides the implementation of readExternal and 
writeExternal for both
- * the wrapper classes for Flume-style Events.
- */
-private[streaming] object EventTransformer extends Logging {
-  def readExternal(in: ObjectInput): (java.util.HashMap[CharSequence, 
CharSequence],
-    Array[Byte]) = {
-    val bodyLength = in.readInt()
-    val bodyBuff = new Array[Byte](bodyLength)
-    in.readFully(bodyBuff)
-
-    val numHeaders = in.readInt()
-    val headers = new java.util.HashMap[CharSequence, CharSequence]
-
-    for (i <- 0 until numHeaders) {
-      val keyLength = in.readInt()
-      val keyBuff = new Array[Byte](keyLength)
-      in.readFully(keyBuff)
-      val key: String = Utils.deserialize(keyBuff)
-
-      val valLength = in.readInt()
-      val valBuff = new Array[Byte](valLength)
-      in.readFully(valBuff)
-      val value: String = Utils.deserialize(valBuff)
-
-      headers.put(key, value)
-    }
-    (headers, bodyBuff)
-  }
-
-  def writeExternal(out: ObjectOutput, headers: java.util.Map[CharSequence, 
CharSequence],
-    body: Array[Byte]) {
-    out.writeInt(body.length)
-    out.write(body)
-    val numHeaders = headers.size()
-    out.writeInt(numHeaders)
-    for ((k, v) <- headers.asScala) {
-      val keyBuff = Utils.serialize(k.toString)
-      out.writeInt(keyBuff.length)
-      out.write(keyBuff)
-      val valBuff = Utils.serialize(v.toString)
-      out.writeInt(valBuff.length)
-      out.write(valBuff)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
deleted file mode 100644
index 8af7c23..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-import scala.collection.mutable.ArrayBuffer
-
-import com.google.common.base.Throwables
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.flume.sink._
-
-/**
- * This class implements the core functionality of [[FlumePollingReceiver]]. 
When started it
- * pulls data from Flume, stores it to Spark and then sends an Ack or Nack. 
This class should be
- * run via a [[java.util.concurrent.Executor]] as this implements [[Runnable]]
- *
- * @param receiver The receiver that owns this instance.
- */
-
-private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends 
Runnable with
-  Logging {
-
-  def run(): Unit = {
-    while (!receiver.isStopped()) {
-      val connection = receiver.getConnections.poll()
-      val client = connection.client
-      var batchReceived = false
-      var seq: CharSequence = null
-      try {
-        getBatch(client) match {
-          case Some(eventBatch) =>
-            batchReceived = true
-            seq = eventBatch.getSequenceNumber
-            val events = toSparkFlumeEvents(eventBatch.getEvents)
-            if (store(events)) {
-              sendAck(client, seq)
-            } else {
-              sendNack(batchReceived, client, seq)
-            }
-          case None =>
-        }
-      } catch {
-        case e: Exception =>
-          Throwables.getRootCause(e) match {
-            // If the cause was an InterruptedException, then check if the 
receiver is stopped -
-            // if yes, just break out of the loop. Else send a Nack and log a 
warning.
-            // In the unlikely case, the cause was not an Exception,
-            // then just throw it out and exit.
-            case interrupted: InterruptedException =>
-              if (!receiver.isStopped()) {
-                logWarning("Interrupted while receiving data from Flume", 
interrupted)
-                sendNack(batchReceived, client, seq)
-              }
-            case exception: Exception =>
-              logWarning("Error while receiving data from Flume", exception)
-              sendNack(batchReceived, client, seq)
-          }
-      } finally {
-        receiver.getConnections.add(connection)
-      }
-    }
-  }
-
-  /**
-   * Gets a batch of events from the specified client. This method does not 
handle any exceptions
-   * which will be propagated to the caller.
-   * @param client Client to get events from
-   * @return [[Some]] which contains the event batch if Flume sent any events 
back, else [[None]]
-   */
-  private def getBatch(client: SparkFlumeProtocol.Callback): 
Option[EventBatch] = {
-    val eventBatch = client.getEventBatch(receiver.getMaxBatchSize)
-    if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
-      // No error, proceed with processing data
-      logDebug(s"Received batch of ${eventBatch.getEvents.size} events with 
sequence " +
-        s"number: ${eventBatch.getSequenceNumber}")
-      Some(eventBatch)
-    } else {
-      logWarning("Did not receive events from Flume agent due to error on the 
Flume agent: " +
-        eventBatch.getErrorMsg)
-      None
-    }
-  }
-
-  /**
-   * Store the events in the buffer to Spark. This method will not propagate 
any exceptions,
-   * but will propagate any other errors.
-   * @param buffer The buffer to store
-   * @return true if the data was stored without any exception being thrown, 
else false
-   */
-  private def store(buffer: ArrayBuffer[SparkFlumeEvent]): Boolean = {
-    try {
-      receiver.store(buffer)
-      true
-    } catch {
-      case e: Exception =>
-        logWarning("Error while attempting to store data received from Flume", 
e)
-        false
-    }
-  }
-
-  /**
-   * Send an ack to the client for the sequence number. This method does not 
handle any exceptions
-   * which will be propagated to the caller.
-   * @param client client to send the ack to
-   * @param seq sequence number of the batch to be ack-ed.
-   * @return
-   */
-  private def sendAck(client: SparkFlumeProtocol.Callback, seq: CharSequence): 
Unit = {
-    logDebug("Sending ack for sequence number: " + seq)
-    client.ack(seq)
-    logDebug("Ack sent for sequence number: " + seq)
-  }
-
-  /**
-   * This method sends a Nack if a batch was received to the client with the 
given sequence
-   * number. Any exceptions thrown by the RPC call is simply thrown out as is 
- no effort is made
-   * to handle it.
-   * @param batchReceived true if a batch was received. If this is false, no 
nack is sent
-   * @param client The client to which the nack should be sent
-   * @param seq The sequence number of the batch that is being nack-ed.
-   */
-  private def sendNack(batchReceived: Boolean, client: 
SparkFlumeProtocol.Callback,
-    seq: CharSequence): Unit = {
-    if (batchReceived) {
-      // Let Flume know that the events need to be pushed back into the 
channel.
-      logDebug("Sending nack for sequence number: " + seq)
-      client.nack(seq) // If the agent is down, even this could fail and throw
-      logDebug("Nack sent for sequence number: " + seq)
-    }
-  }
-
-  /**
-   * Utility method to convert [[SparkSinkEvent]]s to [[SparkFlumeEvent]]s
-   * @param events - Events to convert to SparkFlumeEvents
-   * @return - The SparkFlumeEvent generated from SparkSinkEvent
-   */
-  private def toSparkFlumeEvents(events: java.util.List[SparkSinkEvent]):
-    ArrayBuffer[SparkFlumeEvent] = {
-    // Convert each Flume event to a serializable SparkFlumeEvent
-    val buffer = new ArrayBuffer[SparkFlumeEvent](events.size())
-    var j = 0
-    while (j < events.size()) {
-      val event = events.get(j)
-      val sparkFlumeEvent = new SparkFlumeEvent()
-      sparkFlumeEvent.event.setBody(event.getBody)
-      sparkFlumeEvent.event.setHeaders(event.getHeaders)
-      buffer += sparkFlumeEvent
-      j += 1
-    }
-    buffer
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
deleted file mode 100644
index 13aa817..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{Externalizable, ObjectInput, ObjectOutput}
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-import java.util.concurrent.Executors
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.avro.ipc.NettyServer
-import org.apache.avro.ipc.specific.SpecificResponder
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol, 
Status}
-import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, 
Channels}
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
-
-private[streaming]
-class FlumeInputDStream[T: ClassTag](
-  _ssc: StreamingContext,
-  host: String,
-  port: Int,
-  storageLevel: StorageLevel,
-  enableDecompression: Boolean
-) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
-  override def getReceiver(): Receiver[SparkFlumeEvent] = {
-    new FlumeReceiver(host, port, storageLevel, enableDecompression)
-  }
-}
-
-/**
- * A wrapper class for AvroFlumeEvent's with a custom serialization format.
- *
- * This is necessary because AvroFlumeEvent uses inner data structures
- * which are not serializable.
- */
-class SparkFlumeEvent() extends Externalizable {
-  var event: AvroFlumeEvent = new AvroFlumeEvent()
-
-  /* De-serialize from bytes. */
-  def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
-    val bodyLength = in.readInt()
-    val bodyBuff = new Array[Byte](bodyLength)
-    in.readFully(bodyBuff)
-
-    val numHeaders = in.readInt()
-    val headers = new java.util.HashMap[CharSequence, CharSequence]
-
-    for (i <- 0 until numHeaders) {
-      val keyLength = in.readInt()
-      val keyBuff = new Array[Byte](keyLength)
-      in.readFully(keyBuff)
-      val key: String = Utils.deserialize(keyBuff)
-
-      val valLength = in.readInt()
-      val valBuff = new Array[Byte](valLength)
-      in.readFully(valBuff)
-      val value: String = Utils.deserialize(valBuff)
-
-      headers.put(key, value)
-    }
-
-    event.setBody(ByteBuffer.wrap(bodyBuff))
-    event.setHeaders(headers)
-  }
-
-  /* Serialize to bytes. */
-  def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
-    val body = event.getBody
-    out.writeInt(body.remaining())
-    Utils.writeByteBuffer(body, out)
-
-    val numHeaders = event.getHeaders.size()
-    out.writeInt(numHeaders)
-    for ((k, v) <- event.getHeaders.asScala) {
-      val keyBuff = Utils.serialize(k.toString)
-      out.writeInt(keyBuff.length)
-      out.write(keyBuff)
-      val valBuff = Utils.serialize(v.toString)
-      out.writeInt(valBuff.length)
-      out.write(valBuff)
-    }
-  }
-}
-
-private[streaming] object SparkFlumeEvent {
-  def fromAvroFlumeEvent(in: AvroFlumeEvent): SparkFlumeEvent = {
-    val event = new SparkFlumeEvent
-    event.event = in
-    event
-  }
-}
-
-/** A simple server that implements Flume's Avro protocol. */
-private[streaming]
-class FlumeEventServer(receiver: FlumeReceiver) extends AvroSourceProtocol {
-  override def append(event: AvroFlumeEvent): Status = {
-    receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
-    Status.OK
-  }
-
-  override def appendBatch(events: java.util.List[AvroFlumeEvent]): Status = {
-    events.asScala.foreach(event => 
receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
-    Status.OK
-  }
-}
-
-/**
- * A NetworkReceiver which listens for events using the
- * Flume Avro interface.
- */
-private[streaming]
-class FlumeReceiver(
-    host: String,
-    port: Int,
-    storageLevel: StorageLevel,
-    enableDecompression: Boolean
-  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
-  lazy val responder = new SpecificResponder(
-    classOf[AvroSourceProtocol], new FlumeEventServer(this))
-  var server: NettyServer = null
-
-  private def initServer() = {
-    if (enableDecompression) {
-      val channelFactory = new 
NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-                                                             
Executors.newCachedThreadPool())
-      val channelPipelineFactory = new CompressionChannelPipelineFactory()
-
-      new NettyServer(
-        responder,
-        new InetSocketAddress(host, port),
-        channelFactory,
-        channelPipelineFactory,
-        null)
-    } else {
-      new NettyServer(responder, new InetSocketAddress(host, port))
-    }
-  }
-
-  def onStart() {
-    synchronized {
-      if (server == null) {
-        server = initServer()
-        server.start()
-      } else {
-        logWarning("Flume receiver being asked to start more then once with 
out close")
-      }
-    }
-    logInfo("Flume receiver started")
-  }
-
-  def onStop() {
-    synchronized {
-      if (server != null) {
-        server.close()
-        server = null
-      }
-    }
-    logInfo("Flume receiver stopped")
-  }
-
-  override def preferredLocation: Option[String] = Option(host)
-
-  /**
-   * A Netty Pipeline factory that will decompress incoming data from
-   * and the Netty client and compress data going back to the client.
-   *
-   * The compression on the return is required because Flume requires
-   * a successful response to indicate it can remove the event/batch
-   * from the configured channel
-   */
-  private[streaming]
-  class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
-    def getPipeline(): ChannelPipeline = {
-      val pipeline = Channels.pipeline()
-      val encoder = new ZlibEncoder(6)
-      pipeline.addFirst("deflater", encoder)
-      pipeline.addFirst("inflater", new ZlibDecoder())
-      pipeline
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
deleted file mode 100644
index d84e289..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.streaming.flume
-
-
-import java.net.InetSocketAddress
-import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.flume.sink._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * A `ReceiverInputDStream` that can be used to read data from several Flume 
agents running
- * [[org.apache.spark.streaming.flume.sink.SparkSink]]s.
- * @param _ssc Streaming context that will execute this input stream
- * @param addresses List of addresses at which SparkSinks are listening
- * @param maxBatchSize Maximum size of a batch
- * @param parallelism Number of parallel connections to open
- * @param storageLevel The storage level to use.
- * @tparam T Class type of the object of this stream
- */
-private[streaming] class FlumePollingInputDStream[T: ClassTag](
-    _ssc: StreamingContext,
-    val addresses: Seq[InetSocketAddress],
-    val maxBatchSize: Int,
-    val parallelism: Int,
-    storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[SparkFlumeEvent](_ssc) {
-
-  override def getReceiver(): Receiver[SparkFlumeEvent] = {
-    new FlumePollingReceiver(addresses, maxBatchSize, parallelism, 
storageLevel)
-  }
-}
-
-private[streaming] class FlumePollingReceiver(
-    addresses: Seq[InetSocketAddress],
-    maxBatchSize: Int,
-    parallelism: Int,
-    storageLevel: StorageLevel
-  ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
-
-  lazy val channelFactoryExecutor =
-    Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).
-      setNameFormat("Flume Receiver Channel Thread - %d").build())
-
-  lazy val channelFactory =
-    new NioClientSocketChannelFactory(channelFactoryExecutor, 
channelFactoryExecutor)
-
-  lazy val receiverExecutor = Executors.newFixedThreadPool(parallelism,
-    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver 
Thread - %d").build())
-
-  private lazy val connections = new LinkedBlockingQueue[FlumeConnection]()
-
-  override def onStart(): Unit = {
-    // Create the connections to each Flume agent.
-    addresses.foreach { host =>
-      val transceiver = new NettyTransceiver(host, channelFactory)
-      val client = 
SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver)
-      connections.add(new FlumeConnection(transceiver, client))
-    }
-    for (i <- 0 until parallelism) {
-      logInfo("Starting Flume Polling Receiver worker threads..")
-      // Threads that pull data from Flume.
-      receiverExecutor.submit(new FlumeBatchFetcher(this))
-    }
-  }
-
-  override def onStop(): Unit = {
-    logInfo("Shutting down Flume Polling Receiver")
-    receiverExecutor.shutdown()
-    // Wait upto a minute for the threads to die
-    if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
-      receiverExecutor.shutdownNow()
-    }
-    connections.asScala.foreach(_.transceiver.close())
-    channelFactory.releaseExternalResources()
-  }
-
-  private[flume] def getConnections: LinkedBlockingQueue[FlumeConnection] = {
-    this.connections
-  }
-
-  private[flume] def getMaxBatchSize: Int = {
-    this.maxBatchSize
-  }
-}
-
-/**
- * A wrapper around the transceiver and the Avro IPC API.
- * @param transceiver The transceiver to use for communication with Flume
- * @param client The client that the callbacks are received on.
- */
-private[flume] class FlumeConnection(val transceiver: NettyTransceiver,
-  val client: SparkFlumeProtocol.Callback)
-
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
deleted file mode 100644
index e8623b4..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.{InetSocketAddress, ServerSocket}
-import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.util.{List => JList}
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.commons.lang3.RandomUtils
-import org.apache.flume.source.avro
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.util.Utils
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class FlumeTestUtils {
-
-  private var transceiver: NettyTransceiver = null
-
-  private val testPort: Int = findFreePort()
-
-  def getTestPort(): Int = testPort
-
-  /** Find a free port */
-  private def findFreePort(): Int = {
-    val candidatePort = RandomUtils.nextInt(1024, 65536)
-    Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
-      val socket = new ServerSocket(trialPort)
-      socket.close()
-      (null, trialPort)
-    }, new SparkConf())._2
-  }
-
-  /** Send data to the flume receiver */
-  def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
-    val testAddress = new InetSocketAddress("localhost", testPort)
-
-    val inputEvents = input.asScala.map { item =>
-      val event = new AvroFlumeEvent
-      event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8)))
-      event.setHeaders(Collections.singletonMap("test", "header"))
-      event
-    }
-
-    // if last attempted transceiver had succeeded, close it
-    close()
-
-    // Create transceiver
-    transceiver = {
-      if (enableCompression) {
-        new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
-      } else {
-        new NettyTransceiver(testAddress)
-      }
-    }
-
-    // Create Avro client with the transceiver
-    val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], 
transceiver)
-    if (client == null) {
-      throw new AssertionError("Cannot create client")
-    }
-
-    // Send data
-    val status = client.appendBatch(inputEvents.asJava)
-    if (status != avro.Status.OK) {
-      throw new AssertionError("Sent events unsuccessfully")
-    }
-  }
-
-  def close(): Unit = {
-    if (transceiver != null) {
-      transceiver.close()
-      transceiver = null
-    }
-  }
-
-  /** Class to create socket channel with compression */
-  private class CompressionChannelFactory(compressionLevel: Int)
-    extends NioClientSocketChannelFactory {
-
-    override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
-      val encoder = new ZlibEncoder(compressionLevel)
-      pipeline.addFirst("deflater", encoder)
-      pipeline.addFirst("inflater", new ZlibDecoder())
-      super.newChannel(pipeline)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
deleted file mode 100644
index 707193a..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.io.{ByteArrayOutputStream, DataOutputStream}
-import java.net.InetSocketAddress
-import java.util.{List => JList, Map => JMap}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.api.java.function.PairFunction
-import org.apache.spark.api.python.PythonRDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaPairDStream, 
JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-@deprecated("Deprecated without replacement", "2.3.0")
-object FlumeUtils {
-  private val DEFAULT_POLLING_PARALLELISM = 5
-  private val DEFAULT_POLLING_BATCH_SIZE = 1000
-
-  /**
-   * Create a input stream from a Flume source.
-   * @param ssc      StreamingContext object
-   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
-   * @param port     Port of the slave machine to which the flume data will be 
sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def createStream (
-      ssc: StreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(ssc, hostname, port, storageLevel, false)
-  }
-
-  /**
-   * Create a input stream from a Flume source.
-   * @param ssc      StreamingContext object
-   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
-   * @param port     Port of the slave machine to which the flume data will be 
sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   * @param enableDecompression  should netty server decompress input stream
-   */
-  def createStream (
-      ssc: StreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel,
-      enableDecompression: Boolean
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    val inputStream = new FlumeInputDStream[SparkFlumeEvent](
-        ssc, hostname, port, storageLevel, enableDecompression)
-
-    inputStream
-  }
-
-  /**
-   * Creates a input stream from a Flume source.
-   * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
-   * @param port     Port of the slave machine to which the flume data will be 
sent
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(jssc.ssc, hostname, port)
-  }
-
-  /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
-   * @param port     Port of the slave machine to which the flume data will be 
sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(jssc.ssc, hostname, port, storageLevel, false)
-  }
-
-  /**
-   * Creates a input stream from a Flume source.
-   * @param hostname Hostname of the slave machine to which the flume data 
will be sent
-   * @param port     Port of the slave machine to which the flume data will be 
sent
-   * @param storageLevel  Storage level to use for storing the received objects
-   * @param enableDecompression  should netty server decompress input stream
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel,
-      enableDecompression: Boolean
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are 
available.
-   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
-   * @param hostname Address of the host on which the Spark Sink is running
-   * @param port Port of the host at which the Spark Sink is listening
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      ssc: StreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), 
storageLevel)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are 
available.
-   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
-   * @param addresses List of InetSocketAddresses representing the hosts to 
connect to.
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      ssc: StreamingContext,
-      addresses: Seq[InetSocketAddress],
-      storageLevel: StorageLevel
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(ssc, addresses, storageLevel,
-      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are 
available.
-   * @param addresses List of InetSocketAddresses representing the hosts to 
connect to.
-   * @param maxBatchSize Maximum number of events to be pulled from the Spark 
sink in a
-   *                     single RPC call
-   * @param parallelism Number of concurrent requests this stream should send 
to the sink. Note
-   *                    that having a higher number of requests concurrently 
being pulled will
-   *                    result in this stream using more threads
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      ssc: StreamingContext,
-      addresses: Seq[InetSocketAddress],
-      storageLevel: StorageLevel,
-      maxBatchSize: Int,
-      parallelism: Int
-    ): ReceiverInputDStream[SparkFlumeEvent] = {
-    new FlumePollingInputDStream[SparkFlumeEvent](ssc, addresses, maxBatchSize,
-      parallelism, storageLevel)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are 
available.
-   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
-   * @param hostname Hostname of the host on which the Spark Sink is running
-   * @param port     Port of the host at which the Spark Sink is listening
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc, hostname, port, 
StorageLevel.MEMORY_AND_DISK_SER_2)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are 
available.
-   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
-   * @param hostname     Hostname of the host on which the Spark Sink is 
running
-   * @param port         Port of the host at which the Spark Sink is listening
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), 
storageLevel)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are 
available.
-   * This stream will use a batch size of 1000 events and run 5 threads to 
pull data.
-   * @param addresses    List of InetSocketAddresses on which the Spark Sink 
is running.
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      addresses: Array[InetSocketAddress],
-      storageLevel: StorageLevel
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc, addresses, storageLevel,
-      DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM)
-  }
-
-  /**
-   * Creates an input stream that is to be used with the Spark Sink deployed 
on a Flume agent.
-   * This stream will poll the sink for data and will pull events as they are 
available.
-   * @param addresses    List of InetSocketAddresses on which the Spark Sink 
is running
-   * @param maxBatchSize The maximum number of events to be pulled from the 
Spark sink in a
-   *                     single RPC call
-   * @param parallelism  Number of concurrent requests this stream should send 
to the sink. Note
-   *                     that having a higher number of requests concurrently 
being pulled will
-   *                     result in this stream using more threads
-   * @param storageLevel Storage level to use for storing the received objects
-   */
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      addresses: Array[InetSocketAddress],
-      storageLevel: StorageLevel,
-      maxBatchSize: Int,
-      parallelism: Int
-    ): JavaReceiverInputDStream[SparkFlumeEvent] = {
-    createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, 
parallelism)
-  }
-}
-
-/**
- * This is a helper class that wraps the methods in FlumeUtils into more 
Python-friendly class and
- * function so that it can be easily instantiated and called from Python's 
FlumeUtils.
- */
-private[flume] class FlumeUtilsPythonHelper {
-
-  def createStream(
-      jssc: JavaStreamingContext,
-      hostname: String,
-      port: Int,
-      storageLevel: StorageLevel,
-      enableDecompression: Boolean
-    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
-    val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, 
enableDecompression)
-    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
-  }
-
-  def createPollingStream(
-      jssc: JavaStreamingContext,
-      hosts: JList[String],
-      ports: JList[Int],
-      storageLevel: StorageLevel,
-      maxBatchSize: Int,
-      parallelism: Int
-    ): JavaPairDStream[Array[Byte], Array[Byte]] = {
-    assert(hosts.size() == ports.size())
-    val addresses = hosts.asScala.zip(ports.asScala).map {
-      case (host, port) => new InetSocketAddress(host, port)
-    }
-    val dstream = FlumeUtils.createPollingStream(
-      jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
-    FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
-  }
-
-}
-
-private object FlumeUtilsPythonHelper {
-
-  private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): 
Array[Byte] = {
-    val byteStream = new ByteArrayOutputStream()
-    val output = new DataOutputStream(byteStream)
-    try {
-      output.writeInt(map.size)
-      map.asScala.foreach { kv =>
-        PythonRDD.writeUTF(kv._1.toString, output)
-        PythonRDD.writeUTF(kv._2.toString, output)
-      }
-      byteStream.toByteArray
-    }
-    finally {
-      output.close()
-    }
-  }
-
-  private def toByteArrayPairDStream(dstream: 
JavaReceiverInputDStream[SparkFlumeEvent]):
-    JavaPairDStream[Array[Byte], Array[Byte]] = {
-    dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], 
Array[Byte]] {
-      override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], 
Array[Byte]) = {
-        val event = sparkEvent.event
-        val byteBuffer = event.getBody
-        val body = new Array[Byte](byteBuffer.remaining())
-        byteBuffer.get(body)
-        (stringMapToByteArray(event.getHeaders), body)
-      }
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
deleted file mode 100644
index a3e784a..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.nio.charset.StandardCharsets
-import java.util.{Collections, List => JList, Map => JMap}
-import java.util.concurrent._
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.conf.Configurables
-import org.apache.flume.event.EventBuilder
-
-import org.apache.spark.streaming.flume.sink.{SparkSink, SparkSinkConfig}
-
-/**
- * Share codes for Scala and Python unit tests
- */
-private[flume] class PollingFlumeTestUtils {
-
-  private val batchCount = 5
-  val eventsPerBatch = 100
-  private val totalEventsPerChannel = batchCount * eventsPerBatch
-  private val channelCapacity = 5000
-
-  def getTotalEvents: Int = totalEventsPerChannel * channels.size
-
-  private val channels = new ArrayBuffer[MemoryChannel]
-  private val sinks = new ArrayBuffer[SparkSink]
-
-  /**
-   * Start a sink and return the port of this sink
-   */
-  def startSingleSink(): Int = {
-    channels.clear()
-    sinks.clear()
-
-    // Start the channel and sink.
-    val context = new Context()
-    context.put("capacity", channelCapacity.toString)
-    context.put("transactionCapacity", "1000")
-    context.put("keep-alive", "0")
-    val channel = new MemoryChannel()
-    Configurables.configure(channel, context)
-
-    val sink = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink, context)
-    sink.setChannel(channel)
-    sink.start()
-
-    channels += (channel)
-    sinks += sink
-
-    sink.getPort()
-  }
-
-  /**
-   * Start 2 sinks and return the ports
-   */
-  def startMultipleSinks(): Seq[Int] = {
-    channels.clear()
-    sinks.clear()
-
-    // Start the channel and sink.
-    val context = new Context()
-    context.put("capacity", channelCapacity.toString)
-    context.put("transactionCapacity", "1000")
-    context.put("keep-alive", "0")
-    val channel = new MemoryChannel()
-    Configurables.configure(channel, context)
-
-    val channel2 = new MemoryChannel()
-    Configurables.configure(channel2, context)
-
-    val sink = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink, context)
-    sink.setChannel(channel)
-    sink.start()
-
-    val sink2 = new SparkSink()
-    context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
-    context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
-    Configurables.configure(sink2, context)
-    sink2.setChannel(channel2)
-    sink2.start()
-
-    sinks += sink
-    sinks += sink2
-    channels += channel
-    channels += channel2
-
-    sinks.map(_.getPort())
-  }
-
-  /**
-   * Send data and wait until all data has been received
-   */
-  def sendDataAndEnsureAllDataHasBeenReceived(): Unit = {
-    val executor = Executors.newCachedThreadPool()
-    val executorCompletion = new ExecutorCompletionService[Void](executor)
-
-    val latch = new CountDownLatch(batchCount * channels.size)
-    sinks.foreach(_.countdownWhenBatchReceived(latch))
-
-    channels.foreach { channel =>
-      executorCompletion.submit(new TxnSubmitter(channel))
-    }
-
-    for (i <- 0 until channels.size) {
-      executorCompletion.take()
-    }
-
-    latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
-  }
-
-  /**
-   * A Python-friendly method to assert the output
-   */
-  def assertOutput(
-      outputHeaders: JList[JMap[String, String]], outputBodies: 
JList[String]): Unit = {
-    require(outputHeaders.size == outputBodies.size)
-    val eventSize = outputHeaders.size
-    if (eventSize != totalEventsPerChannel * channels.size) {
-      throw new AssertionError(
-        s"Expected ${totalEventsPerChannel * channels.size} events, but was 
$eventSize")
-    }
-    var counter = 0
-    for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
-      val eventBodyToVerify = s"${channels(k).getName}-$i"
-      val eventHeaderToVerify: JMap[String, String] = 
Collections.singletonMap(s"test-$i", "header")
-      var found = false
-      var j = 0
-      while (j < eventSize && !found) {
-        if (eventBodyToVerify == outputBodies.get(j) &&
-          eventHeaderToVerify == outputHeaders.get(j)) {
-          found = true
-          counter += 1
-        }
-        j += 1
-      }
-    }
-    if (counter != totalEventsPerChannel * channels.size) {
-      throw new AssertionError(
-        s"111 Expected ${totalEventsPerChannel * channels.size} events, but 
was $counter")
-    }
-  }
-
-  def assertChannelsAreEmpty(): Unit = {
-    channels.foreach(assertChannelIsEmpty)
-  }
-
-  private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
-    val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
-    queueRemaining.setAccessible(true)
-    val m = 
queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
-    if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 
channelCapacity) {
-      throw new AssertionError(s"Channel ${channel.getName} is not empty")
-    }
-  }
-
-  def close(): Unit = {
-    sinks.foreach(_.stop())
-    sinks.clear()
-    channels.foreach(_.stop())
-    channels.clear()
-  }
-
-  private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
-    override def call(): Void = {
-      var t = 0
-      for (i <- 0 until batchCount) {
-        val tx = channel.getTransaction
-        tx.begin()
-        for (j <- 0 until eventsPerBatch) {
-          channel.put(EventBuilder.withBody(
-            s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8),
-            Collections.singletonMap(s"test-$t", "header")))
-          t += 1
-        }
-        tx.commit()
-        tx.close()
-        Thread.sleep(500) // Allow some time for the events to reach
-      }
-      null
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
deleted file mode 100644
index 4a5da22..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Spark streaming receiver for Flume.
- */
-package org.apache.spark.streaming.flume;

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala 
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
deleted file mode 100644
index 9bfab68..0000000
--- 
a/external/flume/src/main/scala/org/apache/spark/streaming/flume/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Spark streaming receiver for Flume.
- */
-package object flume

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
 
b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- 
a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
 
b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
deleted file mode 100644
index 79c5b91..0000000
--- 
a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import java.net.InetSocketAddress;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
-  @Test
-  public void testFlumeStream() {
-    // tests the API, does not actually test data receiving
-    InetSocketAddress[] addresses = new InetSocketAddress[] {
-        new InetSocketAddress("localhost", 12345)
-    };
-    JavaReceiverInputDStream<SparkFlumeEvent> test1 =
-        FlumeUtils.createPollingStream(ssc, "localhost", 12345);
-    JavaReceiverInputDStream<SparkFlumeEvent> test2 = 
FlumeUtils.createPollingStream(
-        ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaReceiverInputDStream<SparkFlumeEvent> test3 = 
FlumeUtils.createPollingStream(
-        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaReceiverInputDStream<SparkFlumeEvent> test4 = 
FlumeUtils.createPollingStream(
-        ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
 
b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
deleted file mode 100644
index ada05f2..0000000
--- 
a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
-  @Test
-  public void testFlumeStream() {
-    // tests the API, does not actually test data receiving
-    JavaReceiverInputDStream<SparkFlumeEvent> test1 = 
FlumeUtils.createStream(ssc, "localhost",
-      12345);
-    JavaReceiverInputDStream<SparkFlumeEvent> test2 = 
FlumeUtils.createStream(ssc, "localhost",
-      12345, StorageLevel.MEMORY_AND_DISK_SER_2());
-    JavaReceiverInputDStream<SparkFlumeEvent> test3 = 
FlumeUtils.createStream(ssc, "localhost",
-      12345, StorageLevel.MEMORY_AND_DISK_SER_2(), false);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/flume/src/test/resources/log4j.properties 
b/external/flume/src/test/resources/log4j.properties
deleted file mode 100644
index fd51f8f..0000000
--- a/external/flume/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark_project.jetty=WARN
-

http://git-wip-us.apache.org/repos/asf/spark/blob/a0018141/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
----------------------------------------------------------------------
diff --git 
a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
 
b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
deleted file mode 100644
index c97a27c..0000000
--- 
a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
-import org.apache.spark.util.Utils
-
-/**
- * This is a output stream just for the testsuites. All the output is 
collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- *
- * The buffer contains a sequence of RDD's, each containing a sequence of items
- */
-class TestOutputStream[T: ClassTag](parent: DStream[T],
-    val output: ConcurrentLinkedQueue[Seq[T]] = new 
ConcurrentLinkedQueue[Seq[T]]())
-  extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
-    val collected = rdd.collect()
-    output.add(collected)
-  }, false) {
-
-  // This is to clear the output buffer every it is read from a checkpoint
-  @throws(classOf[IOException])
-  private def readObject(ois: ObjectInputStream): Unit = 
Utils.tryOrIOException {
-    ois.defaultReadObject()
-    output.clear()
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to