fix GEARPUMP-122, refactor kafka connector API

This PR mainly refactor the Kafka connector API with the following changes:

1. removes `OffsetStorage` and `KafkaStorage`, and uses `CheckpointStore` and 
`KafkaStore` for both offset and state checkpoint stores.
2. adds a `checkpoint(CheckpointStoreFactory)` method to `TimeReplayableSource` 
which makes offset store configurable by user. If `checkpoint` is not called, 
then no offset checkpoint will be performed and applications run in 
*at-most-once* mode.
3. moves user facing APIs,  `KafkaSource`, `KafkaSink` and `KafkaStore` to Java 
and keeps their implementations in Scala, which prevents Scala package private 
methods being exposed in Java programs.
4.  `KafkaSource`, `KafkaSink` and `KafkaStore` are all configurable through 
Java properties which is the standard Kafka way.
5. update corresponding UTs.

Author: manuzhang <[email protected]>

Closes #25 from manuzhang/kafka.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/04c3975d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/04c3975d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/04c3975d

Branch: refs/heads/master
Commit: 04c3975d6bdcaa083887ceb633b484e01d041bea
Parents: 32f1507
Author: manuzhang <[email protected]>
Authored: Wed Jun 1 16:53:58 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Jun 1 16:53:58 2016 +0800

----------------------------------------------------------------------
 .../examples/kafka/KafkaReadWrite.scala         |  19 +-
 .../kafka/wordcount/KafkaWordCount.scala        |  26 +-
 .../examples/state/MessageCountApp.scala        |  20 +-
 .../examples/wordcountjava/WordCount.java       |   1 +
 .../hadoop/HadoopCheckpointStoreFactory.scala   |  14 +-
 .../HadoopCheckpointStoreIntegrationSpec.scala  |   5 +-
 .../gearpump/streaming/kafka/KafkaSink.java     |  41 +++
 .../gearpump/streaming/kafka/KafkaSource.java   |  54 ++++
 .../streaming/kafka/KafkaStoreFactory.java      |  40 +++
 .../streaming/kafka/util/KafkaConfig.java       | 264 ++++++++++++++++++
 .../gearpump/streaming/kafka/KafkaSink.scala    |  80 ------
 .../gearpump/streaming/kafka/KafkaSource.scala  | 195 -------------
 .../gearpump/streaming/kafka/KafkaStorage.scala | 148 ----------
 .../streaming/kafka/dsl/KafkaDSLSink.scala      |  15 +-
 .../streaming/kafka/dsl/KafkaDSLUtil.scala      |  55 +---
 .../kafka/lib/DefaultMessageDecoder.scala       |  41 ---
 .../kafka/lib/KafkaOffsetManager.scala          |  66 -----
 .../streaming/kafka/lib/KafkaSourceConfig.scala | 178 ------------
 .../streaming/kafka/lib/KafkaUtil.scala         | 167 ------------
 .../consumer/ExponentialBackoffSleeper.scala    |  55 ----
 .../kafka/lib/consumer/FetchThread.scala        | 139 ----------
 .../kafka/lib/consumer/KafkaConsumer.scala      | 103 -------
 .../kafka/lib/consumer/KafkaMessage.scala       |  38 ---
 .../kafka/lib/grouper/KafkaDefaultGrouper.scala |  38 ---
 .../kafka/lib/grouper/KafkaGrouper.scala        |  30 --
 .../kafka/lib/sink/AbstractKafkaSink.scala      |  92 +++++++
 .../kafka/lib/source/AbstractKafkaSource.scala  | 173 ++++++++++++
 .../lib/source/DefaultMessageDecoder.scala      |  38 +++
 .../consumer/ExponentialBackoffSleeper.scala    |  59 ++++
 .../kafka/lib/source/consumer/FetchThread.scala | 164 +++++++++++
 .../lib/source/consumer/KafkaConsumer.scala     |  94 +++++++
 .../lib/source/consumer/KafkaMessage.scala      |  38 +++
 .../grouper/DefaultPartitionGrouper.scala       |  38 +++
 .../lib/source/grouper/PartitionGrouper.scala   |  30 ++
 .../streaming/kafka/lib/store/KafkaStore.scala  | 127 +++++++++
 .../streaming/kafka/lib/util/KafkaClient.scala  | 121 +++++++++
 .../streaming/kafka/KafkaSinkSpec.scala         |  25 +-
 .../streaming/kafka/KafkaSourceSpec.scala       | 272 +++++++++++--------
 .../streaming/kafka/KafkaStoreSpec.scala        | 169 ++++++++++++
 .../kafka/lib/DefaultMessageDecoderSpec.scala   |  44 ---
 .../kafka/lib/KafkaOffsetManagerSpec.scala      | 117 --------
 .../streaming/kafka/lib/KafkaStorageSpec.scala  | 187 -------------
 .../streaming/kafka/lib/KafkaUtilSpec.scala     | 107 --------
 .../ExponentialBackoffSleeperSpec.scala         |  68 -----
 .../kafka/lib/consumer/FetchThreadSpec.scala    | 113 --------
 .../kafka/lib/consumer/KafkaConsumerSpec.scala  |  88 ------
 .../lib/grouper/KafkaDefaultGrouperSpec.scala   |  42 ---
 .../lib/source/DefaultMessageDecoderSpec.scala  |  52 ++++
 .../ExponentialBackoffSleeperSpec.scala         |  68 +++++
 .../lib/source/consumer/FetchThreadSpec.scala   | 159 +++++++++++
 .../lib/source/consumer/KafkaConsumerSpec.scala |  88 ++++++
 .../grouper/DefaultPartitionGrouperSpec.scala   |  42 +++
 .../kafka/lib/util/KafkaClientSpec.scala        | 139 ++++++++++
 .../kafka/util/KafkaServerHarness.scala         |  19 +-
 .../streaming/kafka/util/ZookeeperHarness.scala |   8 +-
 .../streaming/state/api/PersistentTask.scala    |   3 +-
 .../state/impl/InMemoryCheckpointStore.scala    |   2 +-
 .../transaction/api/CheckpointStore.scala       |   9 +-
 .../transaction/api/MessageDecoder.scala        |  11 +-
 .../transaction/api/OffsetManager.scala         |  44 ---
 .../transaction/api/OffsetStorage.scala         |  66 -----
 .../transaction/api/TimeReplayableSource.scala  |   8 +-
 62 files changed, 2373 insertions(+), 2383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
index 364544b..cfeef5b 100644
--- 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala
@@ -18,7 +18,10 @@
 
 package org.apache.gearpump.streaming.examples.kafka
 
+import java.util.Properties
+
 import akka.actor.ActorSystem
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
 import org.slf4j.Logger
 
 import org.apache.gearpump.cluster.UserConfig
@@ -26,7 +29,7 @@ import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
 import org.apache.gearpump.partitioner.ShufflePartitioner
 import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
+import org.apache.gearpump.streaming.kafka._
 import org.apache.gearpump.streaming.sink.DataSinkProcessor
 import org.apache.gearpump.streaming.source.DataSourceProcessor
 import org.apache.gearpump.util.Graph._
@@ -52,6 +55,7 @@ object KafkaReadWrite extends AkkaApp with ArgumentsParser {
 
   def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
     implicit val actorSystem = system
+    val appName = "KafkaReadWrite"
     val sourceNum = config.getInt("source")
     val sinkNum = config.getInt("sink")
     val zookeeperConnect = config.getString("zookeeperConnect")
@@ -60,14 +64,19 @@ object KafkaReadWrite extends AkkaApp with ArgumentsParser {
     val sinkTopic = config.getString("sinkTopic")
 
     val appConfig = UserConfig.empty
-    val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, 
brokerList)
-    val source = new KafkaSource(sourceTopic, zookeeperConnect, 
offsetStorageFactory)
+    val props = new Properties
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
+    val source = new KafkaSource(sourceTopic, props)
+    val checkpointStoreFactory = new KafkaStoreFactory(props)
+    source.setCheckpointStore(checkpointStoreFactory)
     val sourceProcessor = DataSourceProcessor(source, sourceNum)
-    val sink = new KafkaSink(sinkTopic, brokerList)
+    val sink = new KafkaSink(sinkTopic, props)
     val sinkProcessor = DataSinkProcessor(sink, sinkNum)
     val partitioner = new ShufflePartitioner
     val computation = sourceProcessor ~ partitioner ~> sinkProcessor
-    val app = StreamApplication("KafkaReadWrite", Graph(computation), 
appConfig)
+    val app = StreamApplication(appName, Graph(computation), appConfig)
     app
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
index 5ef1e67..aa9842f 100644
--- 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
@@ -18,16 +18,18 @@
 
 package org.apache.gearpump.streaming.examples.kafka.wordcount
 
+import java.util.Properties
+
 import akka.actor.ActorSystem
 import kafka.api.OffsetRequest
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
 import org.slf4j.Logger
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
 import org.apache.gearpump.partitioner.HashPartitioner
-import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig
-import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
+import org.apache.gearpump.streaming.kafka._
 import org.apache.gearpump.streaming.sink.DataSinkProcessor
 import org.apache.gearpump.streaming.source.DataSourceProcessor
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
@@ -48,25 +50,31 @@ object KafkaWordCount extends AkkaApp with ArgumentsParser {
 
   def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
     implicit val actorSystem = system
+    val appName = "KafkaWordCount"
     val sourceNum = config.getInt("source")
     val splitNum = config.getInt("split")
     val sumNum = config.getInt("sum")
     val sinkNum = config.getInt("sink")
-
     val appConfig = UserConfig.empty
-    val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", 
"localhost:9092")
-    val kafkaSourceConfig = new KafkaSourceConfig()
-      
.withConsumerTopics("topic1").withConsumerStartOffset(OffsetRequest.LatestTime)
-    val source = new KafkaSource(kafkaSourceConfig, offsetStorageFactory)
+    val props = new Properties
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
+    props.put(KafkaConfig.CONSUMER_START_OFFSET_CONFIG,
+      new java.lang.Long(OffsetRequest.LatestTime))
+    props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
+    val sourceTopic = "topic1"
+    val source = new KafkaSource(sourceTopic, props)
+    val checkpointStoreFactory = new KafkaStoreFactory(props)
+    source.setCheckpointStore(checkpointStoreFactory)
     val sourceProcessor = DataSourceProcessor(source, sourceNum)
     val split = Processor[Split](splitNum)
     val sum = Processor[Sum](sumNum)
-    val sink = new KafkaSink("topic2", "localhost:9092")
+    val sink = new KafkaSink("topic2", props)
     val sinkProcessor = DataSinkProcessor(sink, sinkNum)
     val partitioner = new HashPartitioner
     val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~>
       sum ~ partitioner ~> sinkProcessor
-    val app = StreamApplication("KafkaWordCount", Graph(computation), 
appConfig)
+    val app = StreamApplication(appName, Graph(computation), appConfig)
     app
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
index 13bef0d..5a3954a 100644
--- 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -18,7 +18,10 @@
 
 package org.apache.gearpump.streaming.examples.state
 
+import java.util.Properties
+
 import akka.actor.ActorSystem
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.gearpump.cluster.UserConfig
@@ -28,7 +31,7 @@ import org.apache.gearpump.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.examples.state.processor.CountProcessor
 import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
 import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
-import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
+import org.apache.gearpump.streaming.kafka.{KafkaStoreFactory, KafkaSink, 
KafkaSource}
 import org.apache.gearpump.streaming.sink.DataSinkProcessor
 import org.apache.gearpump.streaming.source.DataSourceProcessor
 import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
@@ -63,6 +66,7 @@ object MessageCountApp extends AkkaApp with ArgumentsParser {
   )
 
   def application(config: ParseResult)(implicit system: ActorSystem): 
StreamApplication = {
+    val appName = "MessageCount"
     val hadoopConfig = new Configuration
     hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS))
     val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageCount", hadoopConfig,
@@ -73,19 +77,23 @@ object MessageCountApp extends AkkaApp with ArgumentsParser 
{
       .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
       .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, 
checkpointStoreFactory)
 
-    val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT)
+    val properties = new Properties
+    properties.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, 
config.getString(ZOOKEEPER_CONNECT))
     val brokerList = config.getString(BROKER_LIST)
-    val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, 
brokerList)
+    properties.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    properties.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
+    val kafkaStoreFactory = new KafkaStoreFactory(properties)
     val sourceTopic = config.getString(SOURCE_TOPIC)
-    val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, 
offsetStorageFactory)
+    val kafkaSource = new KafkaSource(sourceTopic, properties)
+    kafkaSource.setCheckpointStore(kafkaStoreFactory)
     val sourceProcessor = DataSourceProcessor(kafkaSource, 
config.getInt(SOURCE_TASK))
     val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), 
taskConf = taskConfig)
-    val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList)
+    val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), properties)
     val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK))
     val partitioner = new HashPartitioner()
     val graph = Graph(sourceProcessor ~ partitioner
       ~> countProcessor ~ partitioner ~> sinkProcessor)
-    val app = StreamApplication("MessageCount", graph, UserConfig.empty)
+    val app = StreamApplication(appName, graph, UserConfig.empty)
     app
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
index ee44536..6b5bba0 100644
--- 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -28,6 +28,7 @@ import org.apache.gearpump.partitioner.Partitioner;
 import org.apache.gearpump.streaming.javaapi.Graph;
 import org.apache.gearpump.streaming.javaapi.Processor;
 import org.apache.gearpump.streaming.javaapi.StreamApplication;
+import org.apache.gearpump.util.Constants;
 
 /** Java version of WordCount with Processor Graph API */
 public class WordCount {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
index e5e0f13..acc2438 100644
--- 
a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
+++ 
b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
@@ -40,21 +40,27 @@ class HadoopCheckpointStoreFactory(
   extends CheckpointStoreFactory {
   import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._
 
+  /**
+   * Overrides Java's default serialization
+   * Please do not remove this
+   */
   private def writeObject(out: ObjectOutputStream): Unit = {
     out.defaultWriteObject()
     hadoopConfig.write(out)
   }
 
+  /**
+   * Overrides Java's default deserialization
+   * Please do not remove this
+   */
   private def readObject(in: ObjectInputStream): Unit = {
     in.defaultReadObject()
     hadoopConfig = new Configuration(false)
     hadoopConfig.readFields(in)
   }
 
-  override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): 
CheckpointStore = {
-    import taskContext.{appId, taskId}
-    val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION",
-      s"app$appId-task${taskId.processorId}_${taskId.index}")
+  override def getCheckpointStore(name: String): CheckpointStore = {
+    val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", name)
     val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig)
     new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
 
b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
index 4fd8dc1..76145e8 100644
--- 
a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
+++ 
b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
@@ -48,7 +48,8 @@ class HadoopCheckpointStoreIntegrationSpec
       val rootDirName = "test"
       val rootDir = new Path(rootDirName + Path.SEPARATOR +
         s"v${HadoopCheckpointStoreFactory.VERSION}")
-      val subDir = new Path(rootDir, "app0-task0_0")
+      val subDirName = "app0-task0_0"
+      val subDir = new Path(rootDir, subDirName)
 
       val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig)
       fs.delete(rootDir, true)
@@ -56,7 +57,7 @@ class HadoopCheckpointStoreIntegrationSpec
 
       val checkpointStoreFactory = new HadoopCheckpointStoreFactory(
         rootDirName, hadoopConfig, new FileSizeRotation(fileSize))
-      val checkpointStore = 
checkpointStoreFactory.getCheckpointStore(userConfig, taskContext)
+      val checkpointStore = 
checkpointStoreFactory.getCheckpointStore(subDirName)
 
       checkpointStore.persist(0L, Array(0.toByte))
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java
 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java
new file mode 100644
index 0000000..f8d85f7
--- /dev/null
+++ 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka;
+
+import org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink;
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig;
+import org.apache.gearpump.streaming.sink.DataSink;
+
+import java.util.Properties;
+
+/**
+ * USER API for kafka sink connector.
+ * Please refer to {@link AbstractKafkaSink} for detailed descriptions and 
implementations.
+ */
+public class KafkaSink extends AbstractKafkaSink implements DataSink {
+
+  public KafkaSink(String topic, Properties props) {
+    super(topic, props);
+  }
+
+  KafkaSink(String topic, Properties props,
+      KafkaConfig.KafkaConfigFactory kafkaConfigFactory,
+      KafkaProducerFactory factory) {
+    super(topic, props, kafkaConfigFactory, factory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java
 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java
new file mode 100644
index 0000000..fdc279b
--- /dev/null
+++ 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka;
+
+import kafka.common.TopicAndPartition;
+import org.apache.gearpump.streaming.kafka.lib.source.AbstractKafkaSource;
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient;
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread;
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig;
+import org.apache.gearpump.streaming.transaction.api.CheckpointStore;
+import org.apache.gearpump.streaming.transaction.api.TimeReplayableSource;
+
+import java.util.Properties;
+
+/**
+ * USER API for kafka source connector.
+ * Please refer to {@link AbstractKafkaSource} for detailed descriptions and 
implementations.
+ */
+public class KafkaSource extends AbstractKafkaSource implements 
TimeReplayableSource {
+
+  public KafkaSource(String topic, Properties properties) {
+    super(topic, properties);
+  }
+
+  // constructor for tests
+  KafkaSource(String topic, Properties properties,
+      KafkaConfig.KafkaConfigFactory configFactory,
+      KafkaClient.KafkaClientFactory clientFactory,
+      FetchThread.FetchThreadFactory threadFactory) {
+    super(topic, properties, configFactory, clientFactory, threadFactory);
+  }
+
+  /**
+   * for tests only
+   */
+  protected void addPartitionAndStore(TopicAndPartition tp, CheckpointStore 
store) {
+    addCheckpointStore(tp, store);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java
 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java
new file mode 100644
index 0000000..2521e70
--- /dev/null
+++ 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka;
+
+import org.apache.gearpump.streaming.kafka.lib.store.AbstractKafkaStoreFactory;
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig;
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory;
+
+import java.util.Properties;
+
+/**
+ * USER API for kafka store factory.
+ * Please refer to {@link AbstractKafkaStoreFactory} for detailed descriptions 
and implementations.
+ */
+public class KafkaStoreFactory extends AbstractKafkaStoreFactory implements 
CheckpointStoreFactory {
+
+  public KafkaStoreFactory(Properties props) {
+    super(props);
+  }
+
+  /** constructor for tests */
+  KafkaStoreFactory(Properties props, KafkaConfig.KafkaConfigFactory factory) {
+    super(props, factory);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
new file mode 100644
index 0000000..403f213
--- /dev/null
+++ 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.util;
+
+import kafka.api.OffsetRequest;
+import kafka.common.TopicAndPartition;
+import kafka.consumer.ConsumerConfig;
+import org.apache.gearpump.streaming.kafka.lib.source.DefaultMessageDecoder;
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient;
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread;
+import 
org.apache.gearpump.streaming.kafka.lib.source.grouper.DefaultPartitionGrouper;
+import org.apache.gearpump.streaming.source.DefaultTimeStampFilter;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+
+/**
+ * kafka specific configs
+ */
+public class KafkaConfig extends AbstractConfig implements Serializable {
+
+  private static final ConfigDef CONFIG;
+
+  public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+  private static final String ZOOKEEPER_CONNECT_DOC =
+      "Zookeeper connect string for Kafka topics management.";
+
+  public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+  public static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port 
pairs to use for "
+      + "establishing the initial connection to the Kafka cluster. "
+      + "The client will make use of all servers irrespective of which servers 
are specified "
+      + "here for bootstrapping&mdash;this list only impacts the initial hosts 
used to discover "
+      + "the full set of servers. This list should be in the form "
+      + "<code>host1:port1,host2:port2,...</code>. Since these servers are 
just used for the "
+      + "initial connection to discover the full cluster membership (which may 
change dynamically),"
+      + " this list need not contain the full set of servers (you may want 
more than one, though, "
+      + "in case a server is down).";
+
+  public static final String CLIENT_ID_CONFIG = "client.id";
+  public static final String CLIENT_ID_DOC = "An id string to pass to the 
server when making "
+      + "requests. The purpose of this is to be able to track the source of 
requests beyond just "
+      + "ip/port by allowing a logical application name to be included in 
server-side request "
+      + "logging.";
+
+  public static final String GROUP_ID_CONFIG = "group.id";
+  public static final String GROUP_ID_DOC =
+      "a string that uniquely identifies a set of consumers within the same 
consumer group";
+
+  public static final String ENABLE_AUTO_COMMIT_CONFIG = "auto.commit.enable";
+  public static final String ENABLE_AUTO_COMMIT_DOC =
+      "If true the consumer's offset will be periodically committed in the 
background.";
+
+  /** KafkaSource specific configs */
+  public static final String CONSUMER_START_OFFSET_CONFIG = 
"consumer.start.offset";
+  private static final String CONSUMER_START_OFFSET_DOC = "kafka offset to 
start consume from. "
+      + "This will be overwritten when checkpoint recover takes effect.";
+
+  public static final String FETCH_THRESHOLD_CONFIG = "fetch.threshold";
+  private static final String FETCH_THRESHOLD_DOC = "kafka messages are 
fetched asynchronously "
+      + "and put onto a internal queue. When the number of messages in the 
queue hit the threshold,"
+      + "the fetch thread stops fetching, and goes to sleep. It starts 
fetching again when the"
+      + "number falls below the threshold";
+
+  public static final String FETCH_SLEEP_MS_CONFIG = "fetch.sleep.ms";
+  private static final String FETCH_SLEEP_MS_DOC =
+      "The amount of time to sleep when hitting fetch.threshold.";
+
+  public static final String MESSAGE_DECODER_CLASS_CONFIG = 
"message.decoder.class";
+  private static final String MESSAGE_DECODER_CLASS_DOC =
+      "Message decoder class that implements the <code>MessageDecoder</code> 
interface.";
+
+  public static final String TIMESTAMP_FILTER_CLASS_CONFIG = 
"timestamp.filter.class";
+  private static final String TIMESTAMP_FILTER_CLASS_DOC =
+      "Timestamp filter class that implements the <code>TimeStampFilter</code> 
interface";
+
+  public static final String PARTITION_GROUPER_CLASS_CONFIG = 
"partition.grouper";
+  private static final String PARTITION_GROUPER_CLASS_DOC =
+      "Partition grouper class that implements the <code>KafkaGrouper</code> 
interface.";
+
+  public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
+  public static final String REPLICATION_FACTOR_DOC =
+      "The replication factor for checkpoint store topic.";
+
+  public static final String CHECKPOINT_STORE_NAME_PREFIX_CONFIG = 
"checkpoint.store.name.prefix";
+  public static final String CHECKPOINT_STORE_NAME_PREFIX_DOC = "name prefix 
for checkpoint "
+      + "store whose name will be of the form, 
namePrefix-sourceTopic-partitionId";
+
+  static {
+    CONFIG = new ConfigDef()
+        .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
+            ConfigDef.Type.LIST,
+            ConfigDef.Importance.HIGH,
+            BOOTSTRAP_SERVERS_DOC)
+        .define(CLIENT_ID_CONFIG,
+            ConfigDef.Type.STRING,
+            "",
+            ConfigDef.Importance.HIGH,
+            CLIENT_ID_DOC)
+        .define(GROUP_ID_CONFIG,
+            ConfigDef.Type.STRING,
+            "",
+            ConfigDef.Importance.HIGH,
+            GROUP_ID_DOC)
+        .define(ZOOKEEPER_CONNECT_CONFIG, // required with no default value
+            ConfigDef.Type.STRING,
+            ConfigDef.Importance.HIGH,
+            ZOOKEEPER_CONNECT_DOC)
+        .define(REPLICATION_FACTOR_CONFIG,
+            ConfigDef.Type.INT,
+            1,
+            ConfigDef.Range.atLeast(1),
+            ConfigDef.Importance.MEDIUM,
+            REPLICATION_FACTOR_DOC)
+        .define(MESSAGE_DECODER_CLASS_CONFIG,
+            ConfigDef.Type.CLASS,
+            DefaultMessageDecoder.class.getName(),
+            ConfigDef.Importance.MEDIUM,
+            MESSAGE_DECODER_CLASS_DOC)
+        .define(TIMESTAMP_FILTER_CLASS_CONFIG,
+            ConfigDef.Type.CLASS,
+            DefaultTimeStampFilter.class.getName(),
+            ConfigDef.Importance.MEDIUM,
+            TIMESTAMP_FILTER_CLASS_DOC)
+        .define(PARTITION_GROUPER_CLASS_CONFIG,
+            ConfigDef.Type.CLASS,
+            DefaultPartitionGrouper.class.getName(),
+            ConfigDef.Importance.MEDIUM,
+            PARTITION_GROUPER_CLASS_DOC)
+        .define(FETCH_THRESHOLD_CONFIG,
+            ConfigDef.Type.INT,
+            10000,
+            ConfigDef.Range.atLeast(0),
+            ConfigDef.Importance.LOW,
+            FETCH_THRESHOLD_DOC)
+        .define(FETCH_SLEEP_MS_CONFIG,
+            ConfigDef.Type.INT,
+            100,
+            ConfigDef.Range.atLeast(0),
+            ConfigDef.Importance.LOW,
+            FETCH_SLEEP_MS_DOC)
+        .define(CONSUMER_START_OFFSET_CONFIG,
+            ConfigDef.Type.LONG,
+            OffsetRequest.EarliestTime(),
+            ConfigDef.Range.atLeast(-2),
+            ConfigDef.Importance.MEDIUM,
+            CONSUMER_START_OFFSET_DOC)
+        .define(ENABLE_AUTO_COMMIT_CONFIG,
+            ConfigDef.Type.BOOLEAN,
+            true,
+            ConfigDef.Importance.MEDIUM,
+            ENABLE_AUTO_COMMIT_DOC)
+        .define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG,
+            ConfigDef.Type.STRING,
+            "",
+            ConfigDef.Importance.HIGH,
+            CHECKPOINT_STORE_NAME_PREFIX_DOC);
+  }
+
+  public KafkaConfig(Properties props) {
+    super(CONFIG, props);
+  }
+
+  public static String getCheckpointStoreNameSuffix(TopicAndPartition tp) {
+    return tp.topic() + "-" + tp.partition();
+  }
+
+  public ConsumerConfig getConsumerConfig() {
+    Properties props = getBaseConsumerConfigs();
+
+    return new ConsumerConfig(props);
+  }
+
+  public Properties getProducerConfig() {
+    Properties props = new Properties();
+    props.putAll(this.originals());
+
+    // remove source properties
+    removeSourceSpecificConfigs(props);
+
+    // remove consumer properties
+    removeConsumerSpecificConfigs(props);
+
+    return props;
+  }
+
+  public String getKafkaStoreTopic(String suffix) {
+    return getString(CHECKPOINT_STORE_NAME_PREFIX_CONFIG) + "-" + suffix;
+  }
+
+  public KafkaClient.KafkaClientFactory getKafkaClientFactory() {
+    return KafkaClient.factory();
+  }
+
+  public FetchThread.FetchThreadFactory getFetchThreadFactory() {
+    return FetchThread.factory();
+  }
+
+
+  private Properties getBaseConsumerConfigs() {
+    Properties props = new Properties();
+    props.putAll(this.originals());
+
+    // remove source properties
+    removeSourceSpecificConfigs(props);
+
+    // remove producer properties
+    removeProducerSpecificConfigs(props);
+
+    // set consumer default property values
+    if (!props.containsKey(GROUP_ID_CONFIG)) {
+      props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG));
+    }
+    props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+    return props;
+  }
+
+  private void removeSourceSpecificConfigs(Properties props) {
+    props.remove(FETCH_SLEEP_MS_CONFIG);
+    props.remove(FETCH_THRESHOLD_CONFIG);
+    props.remove(PARTITION_GROUPER_CLASS_CONFIG);
+    props.remove(MESSAGE_DECODER_CLASS_CONFIG);
+    props.remove(TIMESTAMP_FILTER_CLASS_CONFIG);
+    props.remove(REPLICATION_FACTOR_CONFIG);
+    props.remove(CHECKPOINT_STORE_NAME_PREFIX_CONFIG);
+  }
+
+  private void removeConsumerSpecificConfigs(Properties props) {
+    props.remove(ZOOKEEPER_CONNECT_CONFIG);
+    props.remove(GROUP_ID_CONFIG);
+  }
+
+  private void removeProducerSpecificConfigs(Properties props) {
+    props.remove(BOOTSTRAP_SERVERS_CONFIG);
+  }
+
+
+  public static class KafkaConfigFactory implements Serializable {
+    public KafkaConfig getKafkaConfig(Properties props) {
+      return new KafkaConfig(props);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
deleted file mode 100644
index cb90f93..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka
-
-import java.util.Properties
-
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.task.TaskContext
-
-/**
- * kafka sink connectors that invokes 
org.apache.kafka.clients.producer.KafkaProducer to send
- * messages to kafka queue
- * @param getProducer is a function to construct a KafkaProducer
- * @param topic is the kafka topic to write to
- */
-class KafkaSink private[kafka](
-    getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) 
extends DataSink {
-
-  /**
-   * @param topic producer topic
-   * @param properties producer config
-   */
-  def this(topic: String, properties: Properties) = {
-    this(() => KafkaUtil.createKafkaProducer(properties,
-      new ByteArraySerializer, new ByteArraySerializer), topic)
-  }
-
-  /**
-   *
-   * creates an empty properties with `bootstrap.servers` set to 
`bootstrapServers`
-   * and invokes `KafkaSink(topic, properties)`
-   * @param topic producer topic
-   * @param bootstrapServers kafka producer config `bootstrap.servers`
-   */
-  def this(topic: String, bootstrapServers: String) = {
-    this(topic, KafkaUtil.buildProducerConfig(bootstrapServers))
-  }
-
-  // Lazily construct producer since KafkaProducer is not serializable
-  private lazy val producer = getProducer()
-
-  override def open(context: TaskContext): Unit = {}
-
-  override def write(message: Message): Unit = {
-    val record = message.msg match {
-      case (k, v) =>
-        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
k.asInstanceOf[Array[Byte]],
-          v.asInstanceOf[Array[Byte]])
-      case v =>
-        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
v.asInstanceOf[Array[Byte]])
-    }
-    producer.send(record)
-  }
-
-  override def close(): Unit = {
-    producer.close()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
deleted file mode 100644
index 339711b..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka
-
-import java.util.Properties
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Success}
-
-import kafka.common.TopicAndPartition
-import org.slf4j.Logger
-
-import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, 
KafkaMessage}
-import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, 
KafkaOffsetManager, KafkaSourceConfig, KafkaUtil}
-import org.apache.gearpump.streaming.source.DefaultTimeStampFilter
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
-import org.apache.gearpump.streaming.transaction.api._
-import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
-
-object KafkaSource {
-  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
-}
-
-/**
- * Kafka source connectors that pulls a batch of messages 
(`kafka.consumer.emit.batch.size`)
- * from multiple Kafka TopicAndPartition in a round-robin way.
- *
- * This is a TimeReplayableSource which is able to replay messages given a 
start time.
- * Each kafka message is tagged with a timestamp by
- * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the 
(offset, timestamp)
- * mapping is stored to a 
[[org.apache.gearpump.streaming.transaction.api.OffsetStorage]].
- * On recovery, we could retrieve the previously stored offset from the
- * [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] by 
timestamp and start to read
- * from there.
- *
- * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and 
further filtered by a
- * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]]
- * such that obsolete messages are dropped.
- *
- * @param config kafka source config
- * @param offsetStorageFactory factory to build [[OffsetStorage]]
- * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes
- * @param timestampFilter filters out message based on timestamp
- * @param fetchThread fetches messages and puts on a in-memory queue
- * @param offsetManagers manages offset-to-timestamp storage for each 
kafka.common.TopicAndPartition
- */
-class KafkaSource(
-    config: KafkaSourceConfig,
-    offsetStorageFactory: OffsetStorageFactory,
-    messageDecoder: MessageDecoder = new DefaultMessageDecoder,
-    timestampFilter: TimeStampFilter = new DefaultTimeStampFilter,
-    private var fetchThread: Option[FetchThread] = None,
-    private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = {
-      Map.empty[TopicAndPartition, KafkaOffsetManager]
-    }) extends TimeReplayableSource {
-  import org.apache.gearpump.streaming.kafka.KafkaSource._
-
-  private var startTime: Option[TimeStamp] = None
-
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param properties kafka consumer config
-   * @param offsetStorageFactory 
org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
-   *     that creates 
[[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
-   *
-   */
-  def this(topics: String, properties: Properties, offsetStorageFactory: 
OffsetStorageFactory) = {
-    this(KafkaSourceConfig(properties).withConsumerTopics(topics), 
offsetStorageFactory)
-  }
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param properties kafka consumer config
-   * @param offsetStorageFactory 
org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
-   *   that creates 
[[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
-   * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw 
bytes
-   * @param timestampFilter filters out message based on timestamp
-   */
-  def this(topics: String, properties: Properties, offsetStorageFactory: 
OffsetStorageFactory,
-      messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = {
-    this(KafkaSourceConfig(properties)
-      .withConsumerTopics(topics), offsetStorageFactory,
-      messageDecoder, timestampFilter)
-  }
-
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param zkConnect kafka consumer config `zookeeper.connect`
-   * @param offsetStorageFactory 
org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
-   *     that creates 
[[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
-   */
-  def this(topics: String, zkConnect: String, offsetStorageFactory: 
OffsetStorageFactory) =
-    this(topics, KafkaUtil.buildConsumerConfig(zkConnect), 
offsetStorageFactory)
-
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param zkConnect kafka consumer config `zookeeper.connect`
-   * @param offsetStorageFactory 
org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory
-   *     that creates 
[[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]
-   * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw 
bytes
-   * @param timestampFilter filters out message based on timestamp
-   */
-  def this(topics: String, zkConnect: String, offsetStorageFactory: 
OffsetStorageFactory,
-      messageDecoder: MessageDecoder,
-      timestampFilter: TimeStampFilter) = {
-    this(topics, KafkaUtil.buildConsumerConfig(zkConnect), 
offsetStorageFactory,
-      messageDecoder, timestampFilter)
-  }
-
-  LOG.debug(s"assigned ${offsetManagers.keySet}")
-
-  private[kafka] def setStartTime(startTime: Option[TimeStamp]): Unit = {
-    this.startTime = startTime
-    fetchThread.foreach { fetch =>
-      this.startTime.foreach { time =>
-        offsetManagers.foreach { case (tp, offsetManager) =>
-          offsetManager.resolveOffset(time) match {
-            case Success(offset) =>
-              LOG.debug(s"set start offset to $offset for $tp")
-              fetch.setStartOffset(tp, offset)
-            case Failure(StorageEmpty) =>
-              LOG.debug(s"no previous TimeStamp stored")
-            case Failure(e) => throw e
-          }
-        }
-      }
-      fetch.setDaemon(true)
-      fetch.start()
-    }
-  }
-
-  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
-    import context.{appId, appName, parallelism, taskId}
-
-    val topics = config.getConsumerTopics
-    val grouper = config.getGrouper
-    val consumerConfig = config.consumerConfig
-    val topicAndPartitions = grouper.group(parallelism, taskId.index,
-      
KafkaUtil.getTopicAndPartitions(KafkaUtil.connectZookeeper(consumerConfig)(), 
topics))
-    this.fetchThread = Some(FetchThread(topicAndPartitions, 
config.getFetchThreshold,
-      config.getFetchSleepMS, config.getConsumerStartOffset, consumerConfig))
-    this.offsetManagers = topicAndPartitions.map { tp =>
-      val storageTopic = s"app${appId}_${appName}_${tp.topic}_${tp.partition}"
-      val storage = offsetStorageFactory.getOffsetStorage(storageTopic)
-      tp -> new KafkaOffsetManager(storage)
-    }.toMap
-
-    setStartTime(Option(startTime))
-  }
-
-  override def read(): Message = {
-    fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull
-  }
-
-  private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = {
-    val msgOpt = offsetManagers(kafkaMsg.topicAndPartition)
-      .filter(messageDecoder.fromBytes(kafkaMsg.msg) -> kafkaMsg.offset)
-    msgOpt.flatMap { msg =>
-      startTime match {
-        case None =>
-          Some(msg)
-        case Some(time) =>
-          timestampFilter.filter(msg, time)
-      }
-    }
-  }
-
-  override def close(): Unit = {
-    offsetManagers.foreach(_._2.close())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
deleted file mode 100644
index 8748999..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka
-
-import java.util.Properties
-import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
-
-import com.twitter.bijection.Injection
-import kafka.api.OffsetRequest
-import kafka.consumer.ConsumerConfig
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.slf4j.Logger
-
-import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
-import org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, 
StorageEmpty, Underflow}
-import org.apache.gearpump.streaming.transaction.api.{OffsetStorage, 
OffsetStorageFactory}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Factory that builds [[KafkaStorage]]
- *
- * @param consumerProps kafka consumer config
- * @param producerProps kafka producer config
- */
-class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties)
-  extends OffsetStorageFactory {
-
-  /**
-   * Creates consumer config properties with `zookeeper.connect` set to 
zkConnect
-   * and producer config properties with `bootstrap.servers` set to 
bootstrapServers
-   *
-   * @param zkConnect kafka consumer config `zookeeper.connect`
-   * @param bootstrapServers kafka producer config `bootstrap.servers`
-   */
-  def this(zkConnect: String, bootstrapServers: String) =
-    this(KafkaUtil.buildConsumerConfig(zkConnect), 
KafkaUtil.buildProducerConfig(bootstrapServers))
-
-  override def getOffsetStorage(dir: String): OffsetStorage = {
-    val topic = dir
-    val consumerConfig = new ConsumerConfig(consumerProps)
-    val getConsumer = () => KafkaConsumer(topic, 0, 
OffsetRequest.EarliestTime, consumerConfig)
-    new KafkaStorage(topic, KafkaUtil.createKafkaProducer[Array[Byte], 
Array[Byte]](
-      producerProps, new ByteArraySerializer, new ByteArraySerializer),
-      getConsumer(), KafkaUtil.connectZookeeper(consumerConfig)())
-  }
-}
-
-object KafkaStorage {
-  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaStorage])
-}
-
-/**
- * Stores offset-timestamp mapping to kafka
- *
- * @param topic kafka store topic
- * @param producer kafka producer
- * @param getConsumer function to get kafka consumer
- * @param connectZk function to connect zookeeper
- */
-class KafkaStorage private[kafka](
-    topic: String,
-    producer: KafkaProducer[Array[Byte], Array[Byte]],
-    getConsumer: => KafkaConsumer,
-    connectZk: => ZkClient)
-  extends OffsetStorage {
-
-  private lazy val consumer = getConsumer
-
-  private val dataByTime: List[(TimeStamp, Array[Byte])] = {
-    if (KafkaUtil.topicExists(connectZk, topic)) {
-      load(consumer)
-    } else {
-      List.empty[(TimeStamp, Array[Byte])]
-    }
-  }
-
-  /**
-   * Offsets with timestamp less than `time` have already been processed by 
the system
-   * so we look up the storage for the first offset with timestamp large equal 
than `time`
-   * on replay.
-   *
-   * @param time the timestamp to look up for the earliest unprocessed offset
-   * @return the earliest unprocessed offset if `time` is in the range, 
otherwise failure
-   */
-  override def lookUp(time: TimeStamp): Try[Array[Byte]] = {
-    if (dataByTime.isEmpty) {
-      Failure(StorageEmpty)
-    } else {
-      val min = dataByTime.head
-      val max = dataByTime.last
-      if (time < min._1) {
-        Failure(Underflow(min._2))
-      } else if (time > max._1) {
-        Failure(Overflow(max._2))
-      } else {
-        Success(dataByTime.find(_._1 >= time).get._2)
-      }
-    }
-  }
-
-  override def append(time: TimeStamp, offset: Array[Byte]): Unit = {
-    val message = new ProducerRecord[Array[Byte], Array[Byte]](
-      topic, 0, Injection[Long, Array[Byte]](time), offset)
-    producer.send(message)
-  }
-
-  override def close(): Unit = {
-    producer.close()
-    KafkaUtil.deleteTopic(connectZk, topic)
-  }
-
-  private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, 
Array[Byte])] = {
-    var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, 
Array[Byte])]
-    while (consumer.hasNext) {
-      val kafkaMsg = consumer.next
-      kafkaMsg.key.map { k =>
-        Injection.invert[TimeStamp, Array[Byte]](k) match {
-          case Success(time) =>
-            messagesBuilder += (time -> kafkaMsg.msg)
-          case Failure(e) => throw e
-        }
-      } orElse (throw new RuntimeException("offset key should not be null"))
-    }
-    consumer.close()
-    messagesBuilder.result().toList
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
index 5f48b43..b34149f 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
@@ -28,18 +28,11 @@ class KafkaDSLSink[T](stream: dsl.Stream[T]) {
   /** Create a Kafka DSL Sink */
   def writeToKafka(
       topic: String,
-      bootstrapServers: String,
-      parallism: Int,
-      description: String): dsl.Stream[T] = {
-    stream.sink(new KafkaSink(topic, bootstrapServers), parallism, 
UserConfig.empty, description)
-  }
-
-  def writeToKafka(
-      topic: String,
       properties: Properties,
-      parallism: Int,
-      description: String): dsl.Stream[T] = {
-    stream.sink(new KafkaSink(topic, properties), parallism, UserConfig.empty, 
description)
+      parallelism: Int = 1,
+      userConfig: UserConfig = UserConfig.empty,
+      description: String = null): dsl.Stream[T] = {
+    stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, 
description)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
index 0275966..874d691 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
@@ -22,67 +22,16 @@ import java.util.Properties
 import org.apache.gearpump.streaming.dsl
 import org.apache.gearpump.streaming.dsl.StreamApp
 import org.apache.gearpump.streaming.kafka.KafkaSource
-import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, 
KafkaSourceConfig}
-import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, 
OffsetStorageFactory, TimeStampFilter}
 
 object KafkaDSLUtil {
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      kafkaConfig: KafkaSourceConfig,
-      offsetStorageFactory: OffsetStorageFactory,
-      messageDecoder: MessageDecoder = new DefaultMessageDecoder): 
dsl.Stream[T] = {
-    app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder),
-      parallelism, description)
-  }
-
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      topics: String,
-      zkConnect: String,
-      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory),
-      parallelism, description)
-  }
-
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      topics: String,
-      zkConnect: String,
-      offsetStorageFactory: OffsetStorageFactory,
-      messageDecoder: MessageDecoder,
-      timestampFilter: TimeStampFilter): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory,
-    messageDecoder, timestampFilter), parallelism, description)
-  }
-
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      topics: String,
-      properties: Properties,
-      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory),
-    parallelism, description)
-  }
 
   def createStream[T](
       app: StreamApp,
       topics: String,
       parallelism: Int,
       description: String,
-      properties: Properties,
-      offsetStorageFactory: OffsetStorageFactory,
-      messageDecoder: MessageDecoder,
-      timestampFilter: TimeStampFilter): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory,
-    messageDecoder, timestampFilter), parallelism, description)
+      properties: Properties): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(topics, properties), parallelism, 
description)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
deleted file mode 100644
index ea7e8d1..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-import scala.util.{Failure, Success}
-
-import com.twitter.bijection.Injection
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.transaction.api.MessageDecoder
-
-class DefaultMessageDecoder extends MessageDecoder {
-  override def fromBytes(bytes: Array[Byte]): Message = {
-    Message(bytes, System.currentTimeMillis())
-  }
-}
-
-class StringMessageDecoder extends MessageDecoder {
-  override def fromBytes(bytes: Array[Byte]): Message = {
-    Injection.invert[String, Array[Byte]](bytes) match {
-      case Success(s) => Message(s, System.currentTimeMillis())
-      case Failure(e) => throw e
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
deleted file mode 100644
index 88f509b..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-import scala.util.{Failure, Success, Try}
-
-import com.twitter.bijection.Injection
-import org.slf4j.Logger
-
-import org.apache.gearpump._
-import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, 
StorageEmpty, Underflow}
-import org.apache.gearpump.streaming.transaction.api.{OffsetManager, 
OffsetStorage}
-import org.apache.gearpump.util.LogUtil
-
-object KafkaOffsetManager {
-  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager])
-}
-
-private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends 
OffsetManager {
-  import org.apache.gearpump.streaming.kafka.lib.KafkaOffsetManager._
-
-  var maxTime: TimeStamp = 0L
-
-  override def filter(messageAndOffset: (Message, Long)): Option[Message] = {
-    val (message, offset) = messageAndOffset
-    if (message.timestamp > maxTime) {
-      maxTime = message.timestamp
-      storage.append(maxTime, Injection[Long, Array[Byte]](offset))
-    }
-    Some(message)
-  }
-
-  override def resolveOffset(time: TimeStamp): Try[Long] = {
-    storage.lookUp(time) match {
-      case Success(offset) => Injection.invert[Long, Array[Byte]](offset)
-      case Failure(Overflow(max)) =>
-        LOG.warn(s"start time larger than the max stored TimeStamp; set to max 
offset")
-        Injection.invert[Long, Array[Byte]](max)
-      case Failure(Underflow(min)) =>
-        LOG.warn(s"start time less than the min stored TimeStamp; set to min 
offset")
-        Injection.invert[Long, Array[Byte]](min)
-      case Failure(StorageEmpty) => Failure(StorageEmpty)
-      case Failure(e) => throw e
-    }
-  }
-
-  override def close(): Unit = {
-    storage.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
deleted file mode 100644
index ade414e..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-import java.util.Properties
-
-import kafka.api.OffsetRequest
-import kafka.consumer.ConsumerConfig
-import org.slf4j.Logger
-
-import org.apache.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, 
KafkaGrouper}
-import org.apache.gearpump.util.LogUtil
-
-object KafkaSourceConfig {
-
-  val NAME = "kafka_config"
-
-  val ZOOKEEPER_CONNECT = "zookeeper.connect"
-  val GROUP_ID = "group.id"
-  val CONSUMER_START_OFFSET = "kafka.consumer.start.offset"
-  val CONSUMER_TOPICS = "kafka.consumer.topics"
-  val FETCH_THRESHOLD = "kafka.consumer.fetch.threshold"
-  val FETCH_SLEEP_MS = "kafka.consumer.fetch.sleep.ms"
-  val GROUPER_CLASS = "kafka.grouper.class"
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def apply(consumerProps: Properties): KafkaSourceConfig = new 
KafkaSourceConfig(consumerProps)
-}
-
-/**
- * Extends kafka.consumer.ConsumerConfig with specific config needed by
- * [[org.apache.gearpump.streaming.kafka.KafkaSource]]
- *
- * @param consumerProps kafka consumer config
- */
-class KafkaSourceConfig(val consumerProps: Properties = new Properties)
-  extends java.io.Serializable {
-  import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig._
-
-  if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) {
-    consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181")
-  }
-
-  if (!consumerProps.containsKey(GROUP_ID)) {
-    consumerProps.setProperty(GROUP_ID, "gearpump")
-  }
-
-  def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps)
-
-  /**
-   * Set kafka consumer topics, seperated by comma.
-   *
-   * @param topics comma-separated string
-   * @return new KafkaConfig based on this but with
-   *         
[[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]]
-   *         set to given value
-   */
-  def withConsumerTopics(topics: String): KafkaSourceConfig = {
-    consumerProps.setProperty(CONSUMER_TOPICS, topics)
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Returns a list of kafka consumer topics
-   */
-  def getConsumerTopics: List[String] = {
-    
Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList
-  }
-
-  /**
-   * Sets the sleep interval if there are no more message or message buffer is 
full.
-   *
-   * Consumer.FetchThread will sleep for a while if no more messages or
-   * the incoming queue size is above the
-   * 
[[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
-   *
-   * @param sleepMS sleep interval in milliseconds
-   * @return new KafkaConfig based on this but with
-   *         
[[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]]
-   *         set to given value
-   */
-  def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = {
-    consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS")
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Gets the sleep interval
-   *
-   * Consumer.FetchThread sleeps for a while if no more messages or
-   * the incoming queue is full (size is bigger than the
-   * 
[[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]])
-   *
-   * @return sleep interval in milliseconds
-   */
-  def getFetchSleepMS: Int = {
-    Option(consumerProps.getProperty(FETCH_SLEEP_MS)).getOrElse("100").toInt
-  }
-
-  /**
-   * Sets the batch size we use for one fetch.
-   *
-   * Consumer.FetchThread stops fetching new messages if its incoming queue
-   * size is above the threshold and starts again when the queue size is below 
it
-   *
-   * @param threshold queue size
-   * @return new KafkaConfig based on this but with
-   *         
[[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
-   *         set to give value
-   */
-  def withFetchThreshold(threshold: Int): KafkaSourceConfig = {
-    consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold")
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Returns fetch batch size.
-   *
-   * Consumer.FetchThread stops fetching new messages if
-   * its incoming queue size is above the threshold and starts again when the 
queue size is below it
-   *
-   * @return fetch threshold
-   */
-  def getFetchThreshold: Int = {
-    Option(consumerProps.getProperty(FETCH_THRESHOLD)).getOrElse("10000").toInt
-  }
-
-  /**
-   * Sets [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], 
which
-   * defines how kafka.common.TopicAndPartitions are mapped to source tasks.
-   *
-   * @param className name of the factory class
-   * @return new KafkaConfig based on this but with
-   *         
[[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]]
-   *         set to given value
-   */
-  def withGrouper(className: String): KafkaSourceConfig = {
-    consumerProps.setProperty(GROUPER_CLASS, className)
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Returns [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] 
instance, which
-   * defines how kafka.common.TopicAndPartitions are mapped to source tasks
-   */
-  def getGrouper: KafkaGrouper = {
-    Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS))
-      
.getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper]
-  }
-
-  def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = {
-    consumerProps.setProperty(CONSUMER_START_OFFSET, s"$earliestOrLatest")
-    KafkaSourceConfig(consumerProps)
-  }
-
-  def getConsumerStartOffset: Long = {
-    Option(consumerProps.getProperty(CONSUMER_START_OFFSET))
-      .getOrElse(s"${OffsetRequest.EarliestTime}").toLong
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
deleted file mode 100644
index e8cf574..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-import java.io.InputStream
-import java.util.Properties
-
-import kafka.admin.AdminUtils
-import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
-import kafka.consumer.ConsumerConfig
-import kafka.utils.{ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
-import org.apache.kafka.common.serialization.Serializer
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.LogUtil
-
-object KafkaUtil {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def getBroker(connectZk: => ZkClient, topic: String, partition: Int): Broker 
= {
-    val zkClient = connectZk
-    try {
-      val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-        .getOrElse(throw new RuntimeException(
-          s"leader not available for TopicAndPartition($topic, $partition)"))
-      ZkUtils.getBrokerInfo(zkClient, leader)
-        .getOrElse(throw new RuntimeException(s"broker info not found for 
leader $leader"))
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: 
List[String])
-    : Array[TopicAndPartition] = {
-    val zkClient = connectZk
-    try {
-      ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
-        case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
-      }.toArray
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def topicExists(connectZk: => ZkClient, topic: String): Boolean = {
-    val zkClient = connectZk
-    try {
-      AdminUtils.topicExists(zkClient, topic)
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  /**
-   * create a new kafka topic
-   * return true if topic already exists, and false otherwise
-   */
-  def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, 
replicas: Int)
-    : Boolean = {
-    val zkClient = connectZk
-    try {
-      if (AdminUtils.topicExists(zkClient, topic)) {
-        LOG.info(s"topic $topic exists")
-        true
-      } else {
-        AdminUtils.createTopic(zkClient, topic, partitions, replicas)
-        LOG.info(s"created topic $topic")
-        false
-      }
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def deleteTopic(connectZk: => ZkClient, topic: String): Unit = {
-    val zkClient = connectZk
-    try {
-      AdminUtils.deleteTopic(zkClient, topic)
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def connectZookeeper(config: ConsumerConfig): () => ZkClient = {
-    val zookeeperConnect = config.zkConnect
-    val sessionTimeout = config.zkSessionTimeoutMs
-    val connectionTimeout = config.zkConnectionTimeoutMs
-    () => new ZkClient(zookeeperConnect, sessionTimeout, connectionTimeout, 
ZKStringSerializer)
-  }
-
-  def loadProperties(filename: String): Properties = {
-    val props = new Properties()
-    var propStream: InputStream = null
-    try {
-      propStream = getClass.getClassLoader.getResourceAsStream(filename)
-      props.load(propStream)
-    } catch {
-      case e: Exception =>
-        LOG.error(s"$filename not found")
-    } finally {
-      if (propStream != null) {
-        propStream.close()
-      }
-    }
-    props
-  }
-
-  def createKafkaProducer[K, V](properties: Properties,
-      keySerializer: Serializer[K],
-      valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
-    if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == 
null) {
-      properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
-    }
-    new KafkaProducer[K, V](properties, keySerializer, valueSerializer)
-  }
-
-  def buildProducerConfig(bootstrapServers: String): Properties = {
-    val properties = new Properties()
-    properties.setProperty("bootstrap.servers", bootstrapServers)
-    properties
-  }
-
-  def buildConsumerConfig(zkConnect: String): Properties = {
-    val properties = new Properties()
-    properties.setProperty("zookeeper.connect", zkConnect)
-    properties.setProperty("group.id", "gearpump")
-    properties
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
deleted file mode 100644
index ce17f5a..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.consumer
-
-/**
- * someone sleeps for exponentially increasing duration each time
- * until the cap
- *
- * @param backOffMultiplier The factor by which the duration increases.
- * @param initialDurationMs Time in milliseconds for initial sleep.
- * @param maximumDurationMs Cap up to which we will increase the duration.
- */
-private[consumer] class ExponentialBackoffSleeper(
-    backOffMultiplier: Double = 2.0,
-    initialDurationMs: Long = 100,
-    maximumDurationMs: Long = 10000) {
-
-  require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
-  require(initialDurationMs > 0, "initialDurationMs must be positive")
-  require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be 
>= initialDurationMs")
-
-  private var sleepDuration = initialDurationMs
-
-  def reset(): Unit = {
-    sleepDuration = initialDurationMs
-  }
-
-  def sleep(): Unit = {
-    Thread.sleep(sleepDuration)
-    setNextSleepDuration()
-  }
-
-  def getSleepDuration: Long = sleepDuration
-
-  def setNextSleepDuration(): Unit = {
-    val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long]
-    sleepDuration = math.min(math.max(initialDurationMs, next), 
maximumDurationMs)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
deleted file mode 100644
index 8550207..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.consumer
-
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.LinkedBlockingQueue
-
-import kafka.common.TopicAndPartition
-import kafka.consumer.ConsumerConfig
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.LogUtil
-
-object FetchThread {
-  private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
-
-  def apply(topicAndPartitions: Array[TopicAndPartition],
-      fetchThreshold: Int,
-      fetchSleepMS: Long,
-      startOffsetTime: Long,
-      consumerConfig: ConsumerConfig): FetchThread = {
-    val createConsumer = (tp: TopicAndPartition) =>
-      KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
-
-    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-    new FetchThread(topicAndPartitions, createConsumer, incomingQueue, 
fetchThreshold, fetchSleepMS)
-  }
-}
-
-/**
- * A thread to fetch messages from multiple kafka 
org.apache.kafka.TopicAndPartition and puts them
- * onto a queue, which is asynchronously polled by a consumer
- *
- * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
- *                       
[[org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to
- *                       connect to it
- * @param incomingQueue a queue to buffer incoming messages
- * @param fetchThreshold above which thread should stop fetching messages
- * @param fetchSleepMS interval to sleep when no more messages or hitting 
fetchThreshold
- */
-private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
-    createConsumer: TopicAndPartition => KafkaConsumer,
-    incomingQueue: LinkedBlockingQueue[KafkaMessage],
-    fetchThreshold: Int,
-    fetchSleepMS: Long) extends Thread {
-  import org.apache.gearpump.streaming.kafka.lib.consumer.FetchThread._
-
-  private var consumers: Map[TopicAndPartition, KafkaConsumer] = 
createAllConsumers
-
-  def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
-    consumers(tp).setStartOffset(startOffset)
-  }
-
-  def poll: Option[KafkaMessage] = {
-    Option(incomingQueue.poll())
-  }
-
-  override def run(): Unit = {
-    try {
-      var nextOffsets = Map.empty[TopicAndPartition, Long]
-      var reset = false
-      val sleeper = new ExponentialBackoffSleeper(
-        backOffMultiplier = 2.0,
-        initialDurationMs = 100L,
-        maximumDurationMs = 10000L)
-      while (!Thread.currentThread().isInterrupted) {
-        try {
-          if (reset) {
-            nextOffsets = consumers.mapValues(_.getNextOffset)
-            resetConsumers(nextOffsets)
-            reset = false
-          }
-          val hasMoreMessages = fetchMessage
-          sleeper.reset()
-          if (!hasMoreMessages) {
-            Thread.sleep(fetchSleepMS)
-          }
-        } catch {
-          case exception: Exception =>
-            LOG.warn(s"resetting consumers due to $exception")
-            reset = true
-            sleeper.sleep()
-        }
-      }
-    } catch {
-      case e: InterruptedException => LOG.info("fetch thread got interrupted 
exception")
-      case e: ClosedByInterruptException => LOG.info("fetch thread closed by 
interrupt exception")
-    } finally {
-      consumers.values.foreach(_.close())
-    }
-  }
-
-  /**
-   * fetch message from each TopicAndPartition in a round-robin way
-   */
-  def fetchMessage: Boolean = {
-    consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
-      val (_, consumer) = tpAndConsumer
-      if (incomingQueue.size < fetchThreshold) {
-        if (consumer.hasNext) {
-          incomingQueue.put(consumer.next())
-          true
-        } else {
-          hasNext
-        }
-      } else {
-        true
-      }
-    }
-  }
-
-  private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
-    topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
-  }
-
-  private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit 
= {
-    consumers.values.foreach(_.close())
-    consumers = createAllConsumers
-    consumers.foreach { case (tp, consumer) =>
-      consumer.setStartOffset(nextOffsets(tp))
-    }
-  }
-}


Reply via email to