Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 328063f5a -> 23fa19c7c


GEARPUMP-24, refactor DataSource API

Author: manuzhang <[email protected]>

Closes #7 from manuzhang/data_source.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23fa19c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23fa19c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23fa19c7

Branch: refs/heads/master
Commit: 23fa19c7c9b8ed4c6e3d3936a77fd0beb272cf35
Parents: 328063f
Author: manuzhang <[email protected]>
Authored: Thu May 5 08:58:56 2016 +0800
Committer: huafengw <[email protected]>
Committed: Thu May 5 08:58:56 2016 +0800

----------------------------------------------------------------------
 .../gearpump/streaming/kafka/KafkaSource.scala  | 19 +++------
 .../kafka/lib/consumer/FetchThread.scala        |  2 +-
 .../streaming/kafka/KafkaSourceSpec.scala       | 41 +++++++-------------
 .../io/gearpump/streaming/dsl/StreamApp.scala   | 21 ++++------
 .../streaming/dsl/plan/OpTranslator.scala       |  8 ++--
 .../gearpump/streaming/source/DataSource.scala  | 25 ++++++------
 .../streaming/source/DataSourceTask.scala       | 12 +++---
 7 files changed, 53 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
index 3dede8e..1544445 100644
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
+++ 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
@@ -55,6 +55,7 @@ object KafkaSource {
  * such that obsolete messages are dropped.
  *
  * @param config kafka source config
+ * @param offsetStorageFactory factory to build [[OffsetStorage]]
  * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
  * @param timestampFilter filters out message based on timestamp
  * @param fetchThread fetches messages and puts on a in-memory queue
@@ -152,7 +153,7 @@ class KafkaSource(
     }
   }
 
-  override def open(context: TaskContext, startTime: Option[TimeStamp]): Unit 
= {
+  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
     import context.{appId, appName, parallelism, taskId}
 
     val topics = config.getConsumerTopics
@@ -168,21 +169,11 @@ class KafkaSource(
       tp -> new KafkaOffsetManager(storage)
     }.toMap
 
-    setStartTime(startTime)
+    setStartTime(Option(startTime))
   }
 
-  override def read(batchSize: Int): List[Message] = {
-    val messageBuffer = ArrayBuffer.empty[Message]
-
-    fetchThread.foreach {
-      fetch =>
-        var count = 0
-        while (count < batchSize) {
-          fetch.poll.flatMap(filterMessage).foreach(messageBuffer += _)
-          count += 1
-        }
-    }
-    messageBuffer.toList
+  override def read(): Message = {
+    fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull
   }
 
   private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
index b2b3f4f..8dbe145 100644
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
+++ 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
@@ -88,7 +88,7 @@ private[kafka] class FetchThread(topicAndPartitions: 
Array[TopicAndPartition],
           }
           val hasMoreMessages = fetchMessage
           sleeper.reset()
-          if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) {
+          if (!hasMoreMessages) {
             Thread.sleep(fetchSleepMS)
           }
         } catch {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
index 6cd78fc..7c804f7 100644
--- 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -111,8 +111,6 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
   }
 
   property("KafkaSource read should return number of messages in best effort") 
{
-    val numberGen = Gen.choose[Int](0, 1000)
-
     val kafkaMsgGen = for {
       topic <- Gen.alphaStr
       partition <- Gen.choose[Int](0, 1000)
@@ -120,43 +118,34 @@ class KafkaSourceSpec extends PropSpec with 
PropertyChecks with Matchers with Mo
       key = None
       msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
     } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg)
-    val kafkaMsgListGen = Gen.listOf[KafkaMessage](kafkaMsgGen) suchThat 
(_.size > 0)
-    forAll(numberGen, kafkaMsgListGen) {
-      (number: Int, kafkaMsgList: List[KafkaMessage]) =>
+    val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen)
+    forAll(msgQueueGen) {
+      (msgQueue: Array[KafkaMessage]) =>
         val offsetManager = mock[KafkaOffsetManager]
         val fetchThread = mock[FetchThread]
         val messageDecoder = mock[MessageDecoder]
-        val message = mock[Message]
 
         val timestampFilter = mock[TimeStampFilter]
         val offsetStorageFactory = mock[OffsetStorageFactory]
         val kafkaConfig = mock[KafkaSourceConfig]
-        val offsetManagers = kafkaMsgList.map(_.topicAndPartition -> 
offsetManager).toMap
+        val offsetManagers = msgQueue.map(_.topicAndPartition -> 
offsetManager).toMap
 
         val source = new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder,
           timestampFilter, Some(fetchThread), offsetManagers)
 
-        if (number == 0) {
-          verify(fetchThread, never()).poll
-          source.read(number).size shouldBe 0
+        if (msgQueue.isEmpty) {
+          when(fetchThread.poll).thenReturn(None)
+          source.read() shouldBe null
         } else {
-          kafkaMsgList match {
-            case Nil =>
-              if (number == 1) {
-                when(fetchThread.poll).thenReturn(None)
-              } else {
-                val nones = List.fill(number)(None)
-                when(fetchThread.poll).thenReturn(nones.head, nones.tail: _*)
-              }
-            case list =>
-              val queue = list.map(Option(_)) ++ List.fill(number - 
list.size)(None)
-              when(fetchThread.poll).thenReturn(queue.head, queue.tail: _*)
-              
when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message)
-              when(offsetManager.filter(anyObject[(Message, 
Long)])).thenReturn(Some(message))
-              when(timestampFilter.filter(anyObject[Message], 
anyLong())).thenReturn(Some(message))
+          msgQueue.indices.foreach { i =>
+            val message = Message(msgQueue(i).msg)
+            when(fetchThread.poll).thenReturn(Option(msgQueue(i)))
+            
when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message)
+            when(offsetManager.filter(anyObject[(Message, 
Long)])).thenReturn(Some(message))
+            when(timestampFilter.filter(anyObject[Message], 
anyLong())).thenReturn(Some(message))
+
+            source.read shouldBe message
           }
-          source.read(number).size shouldBe Math.min(number, kafkaMsgList.size)
-          verify(fetchThread, times(number)).poll
         }
         source.close()
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
index 525000d..a2ac70f 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
@@ -108,24 +108,17 @@ object StreamApp {
 
 /** A test message source which generated message sequence repeatedly. */
 class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
-  val list = seq.toList
-  var index = 0
-
-  def readOne(): List[Message] = {
-    if (index < list.length) {
-      val element = List(Message(list(index).asInstanceOf[AnyRef]))
-      index += 1
-      element
+  private val iterator: Iterator[T] = seq.iterator
+
+  override def read(): Message = {
+    if (iterator.hasNext) {
+      Message(iterator.next())
     } else {
-      List.empty[Message]
+      null
     }
   }
 
-  override def read(batchSize: Int): List[Message] = {
-    readOne()
-  }
-
   override def close(): Unit = {}
 
-  override def open(context: TaskContext, startTime: Option[TimeStamp]): Unit 
= {}
+  override def open(context: TaskContext, startTime: TimeStamp): Unit = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
index f916124..11b4c34 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -217,14 +217,13 @@ object OpTranslator {
     }
 
     override def onStart(startTime: StartTime): Unit = {
-      source.open(taskContext, Some(startTime.startTime))
+      source.open(taskContext, startTime.startTime)
       self ! Message("start", System.currentTimeMillis())
     }
 
     override def onNext(msg: Message): Unit = {
       val time = System.currentTimeMillis()
-      // TODO: determine the batch size
-      source.read(1).foreach(msg => {
+      Option(source.read()).foreach { msg =>
         operator match {
           case Some(operator) =>
             operator match {
@@ -238,7 +237,8 @@ object OpTranslator {
           case None =>
             taskContext.output(msg)
         }
-      })
+      }
+
       self ! Message("next", System.currentTimeMillis())
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala 
b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
index 9cb7ca0..e145079 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
@@ -19,7 +19,9 @@
 package io.gearpump.streaming.source
 
 import io.gearpump.streaming.task.TaskContext
-import io.gearpump.{Message, TimeStamp}
+import io.gearpump.Message
+
+import scala.util.Random
 
 /**
  * Interface to implement custom source where data is read into the system.
@@ -27,12 +29,12 @@ import io.gearpump.{Message, TimeStamp}
  *
  * An example would be like
  * {{{
- *  GenStringSource extends DataSource {
+ *  GenMsgSource extends DataSource {
  *
- *    def open(context: TaskContext, startTime: Option[TimeStamp]): Unit = {}
+ *    def open(context: TaskContext, startTime: TimeStamp): Unit = {}
  *
- *    def read(batchSize: Int): List[Message] = {
- *      List.fill(batchSize)(Message("message"))
+ *    def read(context: TaskContext): Message = {
+ *      Message("message")
  *    }
  *
  *    def close(): Unit = {}
@@ -46,18 +48,19 @@ trait DataSource extends java.io.Serializable {
   /**
    * Opens connection to data source
    * invoked in onStart() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
+   *
    * @param context is the task context at runtime
    * @param startTime is the start time of system
    */
-  def open(context: TaskContext, startTime: Option[TimeStamp]): Unit
+  def open(context: TaskContext, startTime: Long): Unit
 
   /**
-   * Reads a number of messages from data source.
-   * invoked in each onNext() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
-   * @param batchSize max number of messages to read
-   * @return a list of messages wrapped in [[io.gearpump.Message]]
+   * Reads next message from data source and
+   * returns null if no message is available
+   *
+   * @return a [[io.gearpump.Message]] or null
    */
-  def read(batchSize: Int): List[Message]
+  def read(): Message
 
   /**
    * Closes connection to data source.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala 
b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
index d9b2110..eab74aa 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
@@ -27,14 +27,14 @@ object DataSourceTask {
 }
 
 /**
- * Task container for [[io.gearpump.streaming.source.DataSource]].
+ * Default Task container for [[io.gearpump.streaming.source.DataSource]] that
+ * reads from DataSource in batch
  * See [[io.gearpump.streaming.source.DataSourceProcessor]] for its usage
  *
  * DataSourceTask calls:
  *  - `DataSource.open()` in `onStart` and pass in 
[[io.gearpump.streaming.task.TaskContext]]
  * and application start time
- *  - `DataSource.read()` in each `onNext`, which reads a batch of messages 
whose size are
- * defined by `gearpump.source.read.batch.size`.
+ *  - `DataSource.read()` in each `onNext`, which reads a batch of messages
  *  - `DataSource.close()` in `onStop`
  */
 class DataSourceTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
@@ -47,12 +47,14 @@ class DataSourceTask(context: TaskContext, conf: 
UserConfig) extends Task(contex
   override def onStart(newStartTime: StartTime): Unit = {
     startTime = newStartTime.startTime
     LOG.info(s"opening data source at $startTime")
-    source.open(context, Some(startTime))
+    source.open(context, startTime)
     self ! Message("start", System.currentTimeMillis())
   }
 
   override def onNext(message: Message): Unit = {
-    source.read(batchSize).foreach(context.output)
+    0.until(batchSize).foreach { _ =>
+      Option(source.read()).foreach(context.output)
+    }
     self ! Message("continue", System.currentTimeMillis())
   }
 

Reply via email to