Repository: incubator-gearpump Updated Branches: refs/heads/master 328063f5a -> 23fa19c7c
GEARPUMP-24, refactor DataSource API Author: manuzhang <[email protected]> Closes #7 from manuzhang/data_source. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/23fa19c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/23fa19c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/23fa19c7 Branch: refs/heads/master Commit: 23fa19c7c9b8ed4c6e3d3936a77fd0beb272cf35 Parents: 328063f Author: manuzhang <[email protected]> Authored: Thu May 5 08:58:56 2016 +0800 Committer: huafengw <[email protected]> Committed: Thu May 5 08:58:56 2016 +0800 ---------------------------------------------------------------------- .../gearpump/streaming/kafka/KafkaSource.scala | 19 +++------ .../kafka/lib/consumer/FetchThread.scala | 2 +- .../streaming/kafka/KafkaSourceSpec.scala | 41 +++++++------------- .../io/gearpump/streaming/dsl/StreamApp.scala | 21 ++++------ .../streaming/dsl/plan/OpTranslator.scala | 8 ++-- .../gearpump/streaming/source/DataSource.scala | 25 ++++++------ .../streaming/source/DataSourceTask.scala | 12 +++--- 7 files changed, 53 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala index 3dede8e..1544445 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala @@ -55,6 +55,7 @@ object KafkaSource { * such that obsolete messages are dropped. * * @param config kafka source config + * @param offsetStorageFactory factory to build [[OffsetStorage]] * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes * @param timestampFilter filters out message based on timestamp * @param fetchThread fetches messages and puts on a in-memory queue @@ -152,7 +153,7 @@ class KafkaSource( } } - override def open(context: TaskContext, startTime: Option[TimeStamp]): Unit = { + override def open(context: TaskContext, startTime: TimeStamp): Unit = { import context.{appId, appName, parallelism, taskId} val topics = config.getConsumerTopics @@ -168,21 +169,11 @@ class KafkaSource( tp -> new KafkaOffsetManager(storage) }.toMap - setStartTime(startTime) + setStartTime(Option(startTime)) } - override def read(batchSize: Int): List[Message] = { - val messageBuffer = ArrayBuffer.empty[Message] - - fetchThread.foreach { - fetch => - var count = 0 - while (count < batchSize) { - fetch.poll.flatMap(filterMessage).foreach(messageBuffer += _) - count += 1 - } - } - messageBuffer.toList + override def read(): Message = { + fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull } private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala index b2b3f4f..8dbe145 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala @@ -88,7 +88,7 @@ private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition], } val hasMoreMessages = fetchMessage sleeper.reset() - if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) { + if (!hasMoreMessages) { Thread.sleep(fetchSleepMS) } } catch { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala index 6cd78fc..7c804f7 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala @@ -111,8 +111,6 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo } property("KafkaSource read should return number of messages in best effort") { - val numberGen = Gen.choose[Int](0, 1000) - val kafkaMsgGen = for { topic <- Gen.alphaStr partition <- Gen.choose[Int](0, 1000) @@ -120,43 +118,34 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo key = None msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg) - val kafkaMsgListGen = Gen.listOf[KafkaMessage](kafkaMsgGen) suchThat (_.size > 0) - forAll(numberGen, kafkaMsgListGen) { - (number: Int, kafkaMsgList: List[KafkaMessage]) => + val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen) + forAll(msgQueueGen) { + (msgQueue: Array[KafkaMessage]) => val offsetManager = mock[KafkaOffsetManager] val fetchThread = mock[FetchThread] val messageDecoder = mock[MessageDecoder] - val message = mock[Message] val timestampFilter = mock[TimeStampFilter] val offsetStorageFactory = mock[OffsetStorageFactory] val kafkaConfig = mock[KafkaSourceConfig] - val offsetManagers = kafkaMsgList.map(_.topicAndPartition -> offsetManager).toMap + val offsetManagers = msgQueue.map(_.topicAndPartition -> offsetManager).toMap val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, Some(fetchThread), offsetManagers) - if (number == 0) { - verify(fetchThread, never()).poll - source.read(number).size shouldBe 0 + if (msgQueue.isEmpty) { + when(fetchThread.poll).thenReturn(None) + source.read() shouldBe null } else { - kafkaMsgList match { - case Nil => - if (number == 1) { - when(fetchThread.poll).thenReturn(None) - } else { - val nones = List.fill(number)(None) - when(fetchThread.poll).thenReturn(nones.head, nones.tail: _*) - } - case list => - val queue = list.map(Option(_)) ++ List.fill(number - list.size)(None) - when(fetchThread.poll).thenReturn(queue.head, queue.tail: _*) - when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message) - when(offsetManager.filter(anyObject[(Message, Long)])).thenReturn(Some(message)) - when(timestampFilter.filter(anyObject[Message], anyLong())).thenReturn(Some(message)) + msgQueue.indices.foreach { i => + val message = Message(msgQueue(i).msg) + when(fetchThread.poll).thenReturn(Option(msgQueue(i))) + when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message) + when(offsetManager.filter(anyObject[(Message, Long)])).thenReturn(Some(message)) + when(timestampFilter.filter(anyObject[Message], anyLong())).thenReturn(Some(message)) + + source.read shouldBe message } - source.read(number).size shouldBe Math.min(number, kafkaMsgList.size) - verify(fetchThread, times(number)).poll } source.close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala index 525000d..a2ac70f 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala @@ -108,24 +108,17 @@ object StreamApp { /** A test message source which generated message sequence repeatedly. */ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { - val list = seq.toList - var index = 0 - - def readOne(): List[Message] = { - if (index < list.length) { - val element = List(Message(list(index).asInstanceOf[AnyRef])) - index += 1 - element + private val iterator: Iterator[T] = seq.iterator + + override def read(): Message = { + if (iterator.hasNext) { + Message(iterator.next()) } else { - List.empty[Message] + null } } - override def read(batchSize: Int): List[Message] = { - readOne() - } - override def close(): Unit = {} - override def open(context: TaskContext, startTime: Option[TimeStamp]): Unit = {} + override def open(context: TaskContext, startTime: TimeStamp): Unit = {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala index f916124..11b4c34 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala @@ -217,14 +217,13 @@ object OpTranslator { } override def onStart(startTime: StartTime): Unit = { - source.open(taskContext, Some(startTime.startTime)) + source.open(taskContext, startTime.startTime) self ! Message("start", System.currentTimeMillis()) } override def onNext(msg: Message): Unit = { val time = System.currentTimeMillis() - // TODO: determine the batch size - source.read(1).foreach(msg => { + Option(source.read()).foreach { msg => operator match { case Some(operator) => operator match { @@ -238,7 +237,8 @@ object OpTranslator { case None => taskContext.output(msg) } - }) + } + self ! Message("next", System.currentTimeMillis()) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala index 9cb7ca0..e145079 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala @@ -19,7 +19,9 @@ package io.gearpump.streaming.source import io.gearpump.streaming.task.TaskContext -import io.gearpump.{Message, TimeStamp} +import io.gearpump.Message + +import scala.util.Random /** * Interface to implement custom source where data is read into the system. @@ -27,12 +29,12 @@ import io.gearpump.{Message, TimeStamp} * * An example would be like * {{{ - * GenStringSource extends DataSource { + * GenMsgSource extends DataSource { * - * def open(context: TaskContext, startTime: Option[TimeStamp]): Unit = {} + * def open(context: TaskContext, startTime: TimeStamp): Unit = {} * - * def read(batchSize: Int): List[Message] = { - * List.fill(batchSize)(Message("message")) + * def read(context: TaskContext): Message = { + * Message("message") * } * * def close(): Unit = {} @@ -46,18 +48,19 @@ trait DataSource extends java.io.Serializable { /** * Opens connection to data source * invoked in onStart() method of [[io.gearpump.streaming.source.DataSourceTask]] + * * @param context is the task context at runtime * @param startTime is the start time of system */ - def open(context: TaskContext, startTime: Option[TimeStamp]): Unit + def open(context: TaskContext, startTime: Long): Unit /** - * Reads a number of messages from data source. - * invoked in each onNext() method of [[io.gearpump.streaming.source.DataSourceTask]] - * @param batchSize max number of messages to read - * @return a list of messages wrapped in [[io.gearpump.Message]] + * Reads next message from data source and + * returns null if no message is available + * + * @return a [[io.gearpump.Message]] or null */ - def read(batchSize: Int): List[Message] + def read(): Message /** * Closes connection to data source. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23fa19c7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala index d9b2110..eab74aa 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala @@ -27,14 +27,14 @@ object DataSourceTask { } /** - * Task container for [[io.gearpump.streaming.source.DataSource]]. + * Default Task container for [[io.gearpump.streaming.source.DataSource]] that + * reads from DataSource in batch * See [[io.gearpump.streaming.source.DataSourceProcessor]] for its usage * * DataSourceTask calls: * - `DataSource.open()` in `onStart` and pass in [[io.gearpump.streaming.task.TaskContext]] * and application start time - * - `DataSource.read()` in each `onNext`, which reads a batch of messages whose size are - * defined by `gearpump.source.read.batch.size`. + * - `DataSource.read()` in each `onNext`, which reads a batch of messages * - `DataSource.close()` in `onStop` */ class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(context, conf) { @@ -47,12 +47,14 @@ class DataSourceTask(context: TaskContext, conf: UserConfig) extends Task(contex override def onStart(newStartTime: StartTime): Unit = { startTime = newStartTime.startTime LOG.info(s"opening data source at $startTime") - source.open(context, Some(startTime)) + source.open(context, startTime) self ! Message("start", System.currentTimeMillis()) } override def onNext(message: Message): Unit = { - source.read(batchSize).foreach(context.output) + 0.until(batchSize).foreach { _ => + Option(source.read()).foreach(context.output) + } self ! Message("continue", System.currentTimeMillis()) }
