http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala 
b/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala
deleted file mode 100644
index a477ba6..0000000
--- a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.external.hbase
-
-import java.io.{File, ObjectInputStream, ObjectOutputStream}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
-import org.apache.hadoop.security.UserGroupInformation
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.util.{Constants, FileUtils}
-
-class HBaseSink(
-    userconfig: UserConfig, tableName: String, @transient var configuration: 
Configuration)
-  extends DataSink{
-  lazy val connection = HBaseSink.getConnection(userconfig, configuration)
-  lazy val table = connection.getTable(TableName.valueOf(tableName))
-
-  override def open(context: TaskContext): Unit = {}
-
-  def this(userconfig: UserConfig, tableName: String) = {
-    this(userconfig, tableName, HBaseConfiguration.create())
-  }
-
-  def insert(rowKey: String, columnGroup: String, columnName: String, value: 
String): Unit = {
-    insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup),
-      Bytes.toBytes(columnName), Bytes.toBytes(value))
-  }
-
-  def insert(
-      rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], 
value: Array[Byte])
-    : Unit = {
-    val put = new Put(rowKey)
-    put.addColumn(columnGroup, columnName, value)
-    table.put(put)
-  }
-
-  def put(msg: Any): Unit = {
-    msg match {
-      case seq: Seq[Any] =>
-        seq.foreach(put)
-      case tuple: (_, _, _, _) => {
-        tuple._1 match {
-          case str: String => {
-            insert(tuple._1.asInstanceOf[String], 
tuple._2.asInstanceOf[String],
-              tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String])
-          }
-          case byteArray: Array[Byte@unchecked] => {
-            insert(tuple._1.asInstanceOf[Array[Byte]], 
tuple._2.asInstanceOf[Array[Byte]],
-              tuple._3.asInstanceOf[Array[Byte]], 
tuple._4.asInstanceOf[Array[Byte]])
-          }
-          case _ =>
-          // Skip
-        }
-      }
-    }
-  }
-
-  override def write(message: Message): Unit = {
-    put(message.msg)
-  }
-
-  def close(): Unit = {
-    connection.close()
-    table.close()
-  }
-
-  private def writeObject(out: ObjectOutputStream): Unit = {
-    out.defaultWriteObject()
-    configuration.write(out)
-  }
-
-  private def readObject(in: ObjectInputStream): Unit = {
-    in.defaultReadObject()
-    val clientConf = new Configuration(false)
-    clientConf.readFields(in)
-    configuration = HBaseConfiguration.create(clientConf)
-  }
-}
-
-object HBaseSink {
-  val HBASESINK = "hbasesink"
-  val TABLE_NAME = "hbase.table.name"
-  val COLUMN_FAMILY = "hbase.table.column.family"
-  val COLUMN_NAME = "hbase.table.column.name"
-
-  def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = {
-    new HBaseSink(userconfig, tableName)
-  }
-
-  def apply[T](userconfig: UserConfig, tableName: String, configuration: 
Configuration)
-    : HBaseSink = {
-    new HBaseSink(userconfig, tableName, configuration)
-  }
-
-  private def getConnection(userConfig: UserConfig, configuration: 
Configuration): Connection = {
-    if (UserGroupInformation.isSecurityEnabled) {
-      val principal = 
userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
-      val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
-      if (principal.isEmpty || keytabContent.isEmpty) {
-        val errorMsg = s"HBase is security enabled, user should provide 
kerberos principal in " +
-          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file " +
-          s"in ${Constants.GEARPUMP_KEYTAB_FILE}"
-        throw new Exception(errorMsg)
-      }
-      val keytabFile = File.createTempFile("login", ".keytab")
-      FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get)
-      keytabFile.setExecutable(false)
-      keytabFile.setWritable(false)
-      keytabFile.setReadable(true, true)
-
-      UserGroupInformation.setConfiguration(configuration)
-      UserGroupInformation.loginUserFromKeytab(principal.get, 
keytabFile.getAbsolutePath)
-      keytabFile.delete()
-    }
-    ConnectionFactory.createConnection(configuration)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
 
b/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
deleted file mode 100644
index 11a20dc..0000000
--- 
a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.external.hbase.dsl
-
-import scala.language.implicitConversions
-
-import org.apache.hadoop.conf.Configuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.external.hbase.HBaseSink
-import io.gearpump.streaming.dsl.Stream
-import io.gearpump.streaming.dsl.Stream.Sink
-
-/** Create a HBase DSL Sink */
-class HBaseDSLSink[T](stream: Stream[T]) {
-
-  def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, 
description: String)
-    : Stream[T] = {
-    stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, 
description)
-  }
-
-  def writeToHbase(userConfig: UserConfig, configuration: Configuration, 
table: String,
-      parallism: Int, description: String): Stream[T] = {
-    stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, 
userConfig, description)
-  }
-}
-
-object HBaseDSLSink {
-  implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = {
-    new HBaseDSLSink[T](stream)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
new file mode 100644
index 0000000..57751a4
--- /dev/null
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/HBaseSink.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.external.hbase
+
+import java.io.{File, ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.{Constants, FileUtils}
+
+class HBaseSink(
+    userconfig: UserConfig, tableName: String, @transient var configuration: 
Configuration)
+  extends DataSink{
+  lazy val connection = HBaseSink.getConnection(userconfig, configuration)
+  lazy val table = connection.getTable(TableName.valueOf(tableName))
+
+  override def open(context: TaskContext): Unit = {}
+
+  def this(userconfig: UserConfig, tableName: String) = {
+    this(userconfig, tableName, HBaseConfiguration.create())
+  }
+
+  def insert(rowKey: String, columnGroup: String, columnName: String, value: 
String): Unit = {
+    insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup),
+      Bytes.toBytes(columnName), Bytes.toBytes(value))
+  }
+
+  def insert(
+      rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], 
value: Array[Byte])
+    : Unit = {
+    val put = new Put(rowKey)
+    put.addColumn(columnGroup, columnName, value)
+    table.put(put)
+  }
+
+  def put(msg: Any): Unit = {
+    msg match {
+      case seq: Seq[Any] =>
+        seq.foreach(put)
+      case tuple: (_, _, _, _) => {
+        tuple._1 match {
+          case str: String => {
+            insert(tuple._1.asInstanceOf[String], 
tuple._2.asInstanceOf[String],
+              tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String])
+          }
+          case byteArray: Array[Byte@unchecked] => {
+            insert(tuple._1.asInstanceOf[Array[Byte]], 
tuple._2.asInstanceOf[Array[Byte]],
+              tuple._3.asInstanceOf[Array[Byte]], 
tuple._4.asInstanceOf[Array[Byte]])
+          }
+          case _ =>
+          // Skip
+        }
+      }
+    }
+  }
+
+  override def write(message: Message): Unit = {
+    put(message.msg)
+  }
+
+  def close(): Unit = {
+    connection.close()
+    table.close()
+  }
+
+  private def writeObject(out: ObjectOutputStream): Unit = {
+    out.defaultWriteObject()
+    configuration.write(out)
+  }
+
+  private def readObject(in: ObjectInputStream): Unit = {
+    in.defaultReadObject()
+    val clientConf = new Configuration(false)
+    clientConf.readFields(in)
+    configuration = HBaseConfiguration.create(clientConf)
+  }
+}
+
+object HBaseSink {
+  val HBASESINK = "hbasesink"
+  val TABLE_NAME = "hbase.table.name"
+  val COLUMN_FAMILY = "hbase.table.column.family"
+  val COLUMN_NAME = "hbase.table.column.name"
+
+  def apply[T](userconfig: UserConfig, tableName: String): HBaseSink = {
+    new HBaseSink(userconfig, tableName)
+  }
+
+  def apply[T](userconfig: UserConfig, tableName: String, configuration: 
Configuration)
+    : HBaseSink = {
+    new HBaseSink(userconfig, tableName, configuration)
+  }
+
+  private def getConnection(userConfig: UserConfig, configuration: 
Configuration): Connection = {
+    if (UserGroupInformation.isSecurityEnabled) {
+      val principal = 
userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
+      val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
+      if (principal.isEmpty || keytabContent.isEmpty) {
+        val errorMsg = s"HBase is security enabled, user should provide 
kerberos principal in " +
+          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file " +
+          s"in ${Constants.GEARPUMP_KEYTAB_FILE}"
+        throw new Exception(errorMsg)
+      }
+      val keytabFile = File.createTempFile("login", ".keytab")
+      FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get)
+      keytabFile.setExecutable(false)
+      keytabFile.setWritable(false)
+      keytabFile.setReadable(true, true)
+
+      UserGroupInformation.setConfiguration(configuration)
+      UserGroupInformation.loginUserFromKeytab(principal.get, 
keytabFile.getAbsolutePath)
+      keytabFile.delete()
+    }
+    ConnectionFactory.createConnection(configuration)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
new file mode 100644
index 0000000..2417763
--- /dev/null
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.external.hbase.dsl
+
+import scala.language.implicitConversions
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.external.hbase.HBaseSink
+import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.dsl.Stream.Sink
+
+/** Create a HBase DSL Sink */
+class HBaseDSLSink[T](stream: Stream[T]) {
+
+  def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, 
description: String)
+    : Stream[T] = {
+    stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, 
description)
+  }
+
+  def writeToHbase(userConfig: UserConfig, configuration: Configuration, 
table: String,
+      parallism: Int, description: String): Stream[T] = {
+    stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, 
userConfig, description)
+  }
+}
+
+object HBaseDSLSink {
+  implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = {
+    new HBaseDSLSink[T](stream)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala 
b/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala
deleted file mode 100644
index cad6581..0000000
--- 
a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.external.hbase
-
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers {
-
-  property("HBaseSink should insert a row successfully") {
-
-  //  import Mockito._
-  //  val htable = Mockito.mock(classOf[HTable])
-  //  val row = "row"
-  //  val group = "group"
-  //  val name = "name"
-  //  val value = "1.2"
-  //  val put = new Put(Bytes.toBytes(row))
-  //  put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
-  //  val hbaseSink = HBaseSink(htable)
-  //  hbaseSink.insert(put)
-  //  verify(htable).put(put)
-
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
 
b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
new file mode 100644
index 0000000..24b9646
--- /dev/null
+++ 
b/external/hbase/src/test/scala/org/apache/gearpump/external/hbase/HBaseSinkSpec.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.external.hbase
+
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers {
+
+  property("HBaseSink should insert a row successfully") {
+
+  //  import Mockito._
+  //  val htable = Mockito.mock(classOf[HTable])
+  //  val row = "row"
+  //  val group = "group"
+  //  val name = "name"
+  //  val value = "1.2"
+  //  val put = new Put(Bytes.toBytes(row))
+  //  put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
+  //  val hbaseSink = HBaseSink(htable)
+  //  hbaseSink.insert(put)
+  //  verify(htable).put(put)
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala
deleted file mode 100644
index b482c7c..0000000
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka
-
-import java.util.Properties
-
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
-
-import io.gearpump.Message
-import io.gearpump.streaming.kafka.lib.KafkaUtil
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.task.TaskContext
-
-/**
- * kafka sink connectors that invokes 
org.apache.kafka.clients.producer.KafkaProducer to send
- * messages to kafka queue
- * @param getProducer is a function to construct a KafkaProducer
- * @param topic is the kafka topic to write to
- */
-class KafkaSink private[kafka](
-    getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) 
extends DataSink {
-
-  /**
-   * @param topic producer topic
-   * @param properties producer config
-   */
-  def this(topic: String, properties: Properties) = {
-    this(() => KafkaUtil.createKafkaProducer(properties,
-      new ByteArraySerializer, new ByteArraySerializer), topic)
-  }
-
-  /**
-   *
-   * creates an empty properties with `bootstrap.servers` set to 
`bootstrapServers`
-   * and invokes `KafkaSink(topic, properties)`
-   * @param topic producer topic
-   * @param bootstrapServers kafka producer config `bootstrap.servers`
-   */
-  def this(topic: String, bootstrapServers: String) = {
-    this(topic, KafkaUtil.buildProducerConfig(bootstrapServers))
-  }
-
-  // Lazily construct producer since KafkaProducer is not serializable
-  private lazy val producer = getProducer()
-
-  override def open(context: TaskContext): Unit = {}
-
-  override def write(message: Message): Unit = {
-    val record = message.msg match {
-      case (k, v) =>
-        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
k.asInstanceOf[Array[Byte]],
-          v.asInstanceOf[Array[Byte]])
-      case v =>
-        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
v.asInstanceOf[Array[Byte]])
-    }
-    producer.send(record)
-  }
-
-  override def close(): Unit = {
-    producer.close()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
deleted file mode 100644
index 1544445..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka
-
-import java.util.Properties
-import scala.collection.mutable.ArrayBuffer
-import scala.util.{Failure, Success}
-
-import kafka.common.TopicAndPartition
-import org.slf4j.Logger
-
-import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage}
-import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, 
KafkaOffsetManager, KafkaSourceConfig, KafkaUtil}
-import io.gearpump.streaming.source.DefaultTimeStampFilter
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
-import io.gearpump.streaming.transaction.api._
-import io.gearpump.util.LogUtil
-import io.gearpump.{Message, TimeStamp}
-
-object KafkaSource {
-  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
-}
-
-/**
- * Kafka source connectors that pulls a batch of messages 
(`kafka.consumer.emit.batch.size`)
- * from multiple Kafka TopicAndPartition in a round-robin way.
- *
- * This is a TimeReplayableSource which is able to replay messages given a 
start time.
- * Each kafka message is tagged with a timestamp by
- * [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, 
timestamp) mapping
- * is stored to a [[io.gearpump.streaming.transaction.api.OffsetStorage]]. On 
recovery,
- * we could retrieve the previously stored offset from the
- * [[io.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and 
start to read from
- * there.
- *
- * kafka message is wrapped into gearpump [[io.gearpump.Message]] and further 
filtered by a
- * [[io.gearpump.streaming.transaction.api.TimeStampFilter]]
- * such that obsolete messages are dropped.
- *
- * @param config kafka source config
- * @param offsetStorageFactory factory to build [[OffsetStorage]]
- * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
- * @param timestampFilter filters out message based on timestamp
- * @param fetchThread fetches messages and puts on a in-memory queue
- * @param offsetManagers manages offset-to-timestamp storage for each 
kafka.common.TopicAndPartition
- */
-class KafkaSource(
-    config: KafkaSourceConfig,
-    offsetStorageFactory: OffsetStorageFactory,
-    messageDecoder: MessageDecoder = new DefaultMessageDecoder,
-    timestampFilter: TimeStampFilter = new DefaultTimeStampFilter,
-    private var fetchThread: Option[FetchThread] = None,
-    private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = {
-      Map.empty[TopicAndPartition, KafkaOffsetManager]
-    }) extends TimeReplayableSource {
-  import io.gearpump.streaming.kafka.KafkaSource._
-
-  private var startTime: Option[TimeStamp] = None
-
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param properties kafka consumer config
-   * @param offsetStorageFactory 
[[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
-   *
-   */
-  def this(topics: String, properties: Properties, offsetStorageFactory: 
OffsetStorageFactory) = {
-    this(KafkaSourceConfig(properties).withConsumerTopics(topics), 
offsetStorageFactory)
-  }
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param properties kafka consumer config
-   * @param offsetStorageFactory 
[[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
-   * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
-   * @param timestampFilter filters out message based on timestamp
-   */
-  def this(topics: String, properties: Properties, offsetStorageFactory: 
OffsetStorageFactory,
-      messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = {
-    this(KafkaSourceConfig(properties)
-      .withConsumerTopics(topics), offsetStorageFactory,
-      messageDecoder, timestampFilter)
-  }
-
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param zkConnect kafka consumer config `zookeeper.connect`
-   * @param offsetStorageFactory 
[[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
-   */
-  def this(topics: String, zkConnect: String, offsetStorageFactory: 
OffsetStorageFactory) =
-    this(topics, KafkaUtil.buildConsumerConfig(zkConnect), 
offsetStorageFactory)
-
-  /**
-   * Constructs a Kafka Source by...
-   *
-   * @param topics comma-separated string of topics
-   * @param zkConnect kafka consumer config `zookeeper.connect`
-   * @param offsetStorageFactory 
[[io.gearpump.streaming.transaction.api.OffsetStorageFactory]]
-   *   that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]]
-   * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes
-   * @param timestampFilter filters out message based on timestamp
-   */
-  def this(topics: String, zkConnect: String, offsetStorageFactory: 
OffsetStorageFactory,
-      messageDecoder: MessageDecoder,
-      timestampFilter: TimeStampFilter) = {
-    this(topics, KafkaUtil.buildConsumerConfig(zkConnect), 
offsetStorageFactory,
-      messageDecoder, timestampFilter)
-  }
-
-  LOG.debug(s"assigned ${offsetManagers.keySet}")
-
-  private[kafka] def setStartTime(startTime: Option[TimeStamp]): Unit = {
-    this.startTime = startTime
-    fetchThread.foreach { fetch =>
-      this.startTime.foreach { time =>
-        offsetManagers.foreach { case (tp, offsetManager) =>
-          offsetManager.resolveOffset(time) match {
-            case Success(offset) =>
-              LOG.debug(s"set start offset to $offset for $tp")
-              fetch.setStartOffset(tp, offset)
-            case Failure(StorageEmpty) =>
-              LOG.debug(s"no previous TimeStamp stored")
-            case Failure(e) => throw e
-          }
-        }
-      }
-      fetch.setDaemon(true)
-      fetch.start()
-    }
-  }
-
-  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
-    import context.{appId, appName, parallelism, taskId}
-
-    val topics = config.getConsumerTopics
-    val grouper = config.getGrouper
-    val consumerConfig = config.consumerConfig
-    val topicAndPartitions = grouper.group(parallelism, taskId.index,
-      
KafkaUtil.getTopicAndPartitions(KafkaUtil.connectZookeeper(consumerConfig)(), 
topics))
-    this.fetchThread = Some(FetchThread(topicAndPartitions, 
config.getFetchThreshold,
-      config.getFetchSleepMS, config.getConsumerStartOffset, consumerConfig))
-    this.offsetManagers = topicAndPartitions.map { tp =>
-      val storageTopic = s"app${appId}_${appName}_${tp.topic}_${tp.partition}"
-      val storage = offsetStorageFactory.getOffsetStorage(storageTopic)
-      tp -> new KafkaOffsetManager(storage)
-    }.toMap
-
-    setStartTime(Option(startTime))
-  }
-
-  override def read(): Message = {
-    fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull
-  }
-
-  private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = {
-    val msgOpt = offsetManagers(kafkaMsg.topicAndPartition)
-      .filter(messageDecoder.fromBytes(kafkaMsg.msg) -> kafkaMsg.offset)
-    msgOpt.flatMap { msg =>
-      startTime match {
-        case None =>
-          Some(msg)
-        case Some(time) =>
-          timestampFilter.filter(msg, time)
-      }
-    }
-  }
-
-  override def close(): Unit = {
-    offsetManagers.foreach(_._2.close())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
deleted file mode 100644
index e50bf84..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka
-
-import java.util.Properties
-import scala.collection.mutable
-import scala.util.{Failure, Success, Try}
-
-import com.twitter.bijection.Injection
-import kafka.api.OffsetRequest
-import kafka.consumer.ConsumerConfig
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.kafka.lib.KafkaUtil
-import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer
-import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, 
StorageEmpty, Underflow}
-import io.gearpump.streaming.transaction.api.{OffsetStorage, 
OffsetStorageFactory}
-import io.gearpump.util.LogUtil
-
-/**
- * Factory that builds [[KafkaStorage]]
- *
- * @param consumerProps kafka consumer config
- * @param producerProps kafka producer config
- */
-class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties)
-  extends OffsetStorageFactory {
-
-  /**
-   * Creates consumer config properties with `zookeeper.connect` set to 
zkConnect
-   * and producer config properties with `bootstrap.servers` set to 
bootstrapServers
-   *
-   * @param zkConnect kafka consumer config `zookeeper.connect`
-   * @param bootstrapServers kafka producer config `bootstrap.servers`
-   */
-  def this(zkConnect: String, bootstrapServers: String) =
-    this(KafkaUtil.buildConsumerConfig(zkConnect), 
KafkaUtil.buildProducerConfig(bootstrapServers))
-
-  override def getOffsetStorage(dir: String): OffsetStorage = {
-    val topic = dir
-    val consumerConfig = new ConsumerConfig(consumerProps)
-    val getConsumer = () => KafkaConsumer(topic, 0, 
OffsetRequest.EarliestTime, consumerConfig)
-    new KafkaStorage(topic, KafkaUtil.createKafkaProducer[Array[Byte], 
Array[Byte]](
-      producerProps, new ByteArraySerializer, new ByteArraySerializer),
-      getConsumer(), KafkaUtil.connectZookeeper(consumerConfig)())
-  }
-}
-
-object KafkaStorage {
-  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaStorage])
-}
-
-/**
- * Stores offset-timestamp mapping to kafka
- *
- * @param topic kafka store topic
- * @param producer kafka producer
- * @param getConsumer function to get kafka consumer
- * @param connectZk function to connect zookeeper
- */
-class KafkaStorage private[kafka](
-    topic: String,
-    producer: KafkaProducer[Array[Byte], Array[Byte]],
-    getConsumer: => KafkaConsumer,
-    connectZk: => ZkClient)
-  extends OffsetStorage {
-
-  private lazy val consumer = getConsumer
-
-  private val dataByTime: List[(TimeStamp, Array[Byte])] = {
-    if (KafkaUtil.topicExists(connectZk, topic)) {
-      load(consumer)
-    } else {
-      List.empty[(TimeStamp, Array[Byte])]
-    }
-  }
-
-  /**
-   * Offsets with timestamp less than `time` have already been processed by 
the system
-   * so we look up the storage for the first offset with timestamp large equal 
than `time`
-   * on replay.
-   *
-   * @param time the timestamp to look up for the earliest unprocessed offset
-   * @return the earliest unprocessed offset if `time` is in the range, 
otherwise failure
-   */
-  override def lookUp(time: TimeStamp): Try[Array[Byte]] = {
-    if (dataByTime.isEmpty) {
-      Failure(StorageEmpty)
-    } else {
-      val min = dataByTime.head
-      val max = dataByTime.last
-      if (time < min._1) {
-        Failure(Underflow(min._2))
-      } else if (time > max._1) {
-        Failure(Overflow(max._2))
-      } else {
-        Success(dataByTime.find(_._1 >= time).get._2)
-      }
-    }
-  }
-
-  override def append(time: TimeStamp, offset: Array[Byte]): Unit = {
-    val message = new ProducerRecord[Array[Byte], Array[Byte]](
-      topic, 0, Injection[Long, Array[Byte]](time), offset)
-    producer.send(message)
-  }
-
-  override def close(): Unit = {
-    producer.close()
-    KafkaUtil.deleteTopic(connectZk, topic)
-  }
-
-  private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, 
Array[Byte])] = {
-    var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, 
Array[Byte])]
-    while (consumer.hasNext) {
-      val kafkaMsg = consumer.next
-      kafkaMsg.key.map { k =>
-        Injection.invert[TimeStamp, Array[Byte]](k) match {
-          case Success(time) =>
-            messagesBuilder += (time -> kafkaMsg.msg)
-          case Failure(e) => throw e
-        }
-      } orElse (throw new RuntimeException("offset key should not be null"))
-    }
-    consumer.close()
-    messagesBuilder.result().toList
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
deleted file mode 100644
index 2a852e2..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.kafka.dsl
-
-import java.util.Properties
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.dsl
-import io.gearpump.streaming.kafka.KafkaSink
-
-class KafkaDSLSink[T](stream: dsl.Stream[T]) {
-
-  /** Create a Kafka DSL Sink */
-  def writeToKafka(
-      topic: String,
-      bootstrapServers: String,
-      parallism: Int,
-      description: String): dsl.Stream[T] = {
-    stream.sink(new KafkaSink(topic, bootstrapServers), parallism, 
UserConfig.empty, description)
-  }
-
-  def writeToKafka(
-      topic: String,
-      properties: Properties,
-      parallism: Int,
-      description: String): dsl.Stream[T] = {
-    stream.sink(new KafkaSink(topic, properties), parallism, UserConfig.empty, 
description)
-  }
-}
-
-object KafkaDSLSink {
-
-  import scala.language.implicitConversions
-
-  implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] 
= {
-    new KafkaDSLSink[T](stream)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
deleted file mode 100644
index 325b40f..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.kafka.dsl
-
-import java.util.Properties
-
-import io.gearpump.streaming.dsl
-import io.gearpump.streaming.dsl.StreamApp
-import io.gearpump.streaming.kafka.KafkaSource
-import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, 
KafkaSourceConfig}
-import io.gearpump.streaming.transaction.api.{MessageDecoder, 
OffsetStorageFactory, TimeStampFilter}
-
-object KafkaDSLUtil {
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      kafkaConfig: KafkaSourceConfig,
-      offsetStorageFactory: OffsetStorageFactory,
-      messageDecoder: MessageDecoder = new DefaultMessageDecoder): 
dsl.Stream[T] = {
-    app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder),
-      parallelism, description)
-  }
-
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      topics: String,
-      zkConnect: String,
-      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory),
-      parallelism, description)
-  }
-
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      topics: String,
-      zkConnect: String,
-      offsetStorageFactory: OffsetStorageFactory,
-      messageDecoder: MessageDecoder,
-      timestampFilter: TimeStampFilter): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory,
-    messageDecoder, timestampFilter), parallelism, description)
-  }
-
-  def createStream[T](
-      app: StreamApp,
-      parallelism: Int,
-      description: String,
-      topics: String,
-      properties: Properties,
-      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory),
-    parallelism, description)
-  }
-
-  def createStream[T](
-      app: StreamApp,
-      topics: String,
-      parallelism: Int,
-      description: String,
-      properties: Properties,
-      offsetStorageFactory: OffsetStorageFactory,
-      messageDecoder: MessageDecoder,
-      timestampFilter: TimeStampFilter): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, properties, offsetStorageFactory,
-    messageDecoder, timestampFilter), parallelism, description)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
deleted file mode 100644
index f846efe..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import scala.util.{Failure, Success}
-
-import com.twitter.bijection.Injection
-
-import io.gearpump.Message
-import io.gearpump.streaming.transaction.api.MessageDecoder
-
-class DefaultMessageDecoder extends MessageDecoder {
-  override def fromBytes(bytes: Array[Byte]): Message = {
-    Message(bytes, System.currentTimeMillis())
-  }
-}
-
-class StringMessageDecoder extends MessageDecoder {
-  override def fromBytes(bytes: Array[Byte]): Message = {
-    Injection.invert[String, Array[Byte]](bytes) match {
-      case Success(s) => Message(s, System.currentTimeMillis())
-      case Failure(e) => throw e
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
deleted file mode 100644
index e9c95e3..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import scala.util.{Failure, Success, Try}
-
-import com.twitter.bijection.Injection
-import org.slf4j.Logger
-
-import io.gearpump._
-import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, 
StorageEmpty, Underflow}
-import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage}
-import io.gearpump.util.LogUtil
-
-object KafkaOffsetManager {
-  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager])
-}
-
-private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends 
OffsetManager {
-  import io.gearpump.streaming.kafka.lib.KafkaOffsetManager._
-
-  var maxTime: TimeStamp = 0L
-
-  override def filter(messageAndOffset: (Message, Long)): Option[Message] = {
-    val (message, offset) = messageAndOffset
-    if (message.timestamp > maxTime) {
-      maxTime = message.timestamp
-      storage.append(maxTime, Injection[Long, Array[Byte]](offset))
-    }
-    Some(message)
-  }
-
-  override def resolveOffset(time: TimeStamp): Try[Long] = {
-    storage.lookUp(time) match {
-      case Success(offset) => Injection.invert[Long, Array[Byte]](offset)
-      case Failure(Overflow(max)) =>
-        LOG.warn(s"start time larger than the max stored TimeStamp; set to max 
offset")
-        Injection.invert[Long, Array[Byte]](max)
-      case Failure(Underflow(min)) =>
-        LOG.warn(s"start time less than the min stored TimeStamp; set to min 
offset")
-        Injection.invert[Long, Array[Byte]](min)
-      case Failure(StorageEmpty) => Failure(StorageEmpty)
-      case Failure(e) => throw e
-    }
-  }
-
-  override def close(): Unit = {
-    storage.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
deleted file mode 100644
index 123f3ac..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import java.util.Properties
-
-import kafka.api.OffsetRequest
-import kafka.consumer.ConsumerConfig
-import org.slf4j.Logger
-
-import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, 
KafkaGrouper}
-import io.gearpump.util.LogUtil
-
-object KafkaSourceConfig {
-
-  val NAME = "kafka_config"
-
-  val ZOOKEEPER_CONNECT = "zookeeper.connect"
-  val GROUP_ID = "group.id"
-  val CONSUMER_START_OFFSET = "kafka.consumer.start.offset"
-  val CONSUMER_TOPICS = "kafka.consumer.topics"
-  val FETCH_THRESHOLD = "kafka.consumer.fetch.threshold"
-  val FETCH_SLEEP_MS = "kafka.consumer.fetch.sleep.ms"
-  val GROUPER_CLASS = "kafka.grouper.class"
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def apply(consumerProps: Properties): KafkaSourceConfig = new 
KafkaSourceConfig(consumerProps)
-}
-
-/**
- * Extends kafka.consumer.ConsumerConfig with specific config needed by
- * [[io.gearpump.streaming.kafka.KafkaSource]]
- *
- * @param consumerProps kafka consumer config
- */
-class KafkaSourceConfig(val consumerProps: Properties = new Properties)
-  extends java.io.Serializable {
-  import io.gearpump.streaming.kafka.lib.KafkaSourceConfig._
-
-  if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) {
-    consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181")
-  }
-
-  if (!consumerProps.containsKey(GROUP_ID)) {
-    consumerProps.setProperty(GROUP_ID, "gearpump")
-  }
-
-  def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps)
-
-  /**
-   * Set kafka consumer topics, seperated by comma.
-   *
-   * @param topics comma-separated string
-   * @return new KafkaConfig based on this but with
-   *         
[[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]]
-   *         set to given value
-   */
-  def withConsumerTopics(topics: String): KafkaSourceConfig = {
-    consumerProps.setProperty(CONSUMER_TOPICS, topics)
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Returns a list of kafka consumer topics
-   */
-  def getConsumerTopics: List[String] = {
-    
Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList
-  }
-
-  /**
-   * Sets the sleep interval if there are no more message or message buffer is 
full.
-   *
-   * Consumer.FetchThread will sleep for a while if no more messages or
-   * the incoming queue size is above the
-   * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]
-   *
-   * @param sleepMS sleep interval in milliseconds
-   * @return new KafkaConfig based on this but with
-   *         
[[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]] set to 
given value
-   */
-  def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = {
-    consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS")
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Gets the sleep interval
-   *
-   * Consumer.FetchThread sleeps for a while if no more messages or
-   * the incoming queue is full (size is bigger than the
-   * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]])
-   *
-   * @return sleep interval in milliseconds
-   */
-  def getFetchSleepMS: Int = {
-    Option(consumerProps.getProperty(FETCH_SLEEP_MS)).getOrElse("100").toInt
-  }
-
-  /**
-   * Sets the batch size we use for one fetch.
-   *
-   * Consumer.FetchThread stops fetching new messages if its incoming queue
-   * size is above the threshold and starts again when the queue size is below 
it
-   *
-   * @param threshold queue size
-   * @return new KafkaConfig based on this but with
-   *         
[[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] set to 
give value
-   */
-  def withFetchThreshold(threshold: Int): KafkaSourceConfig = {
-    consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold")
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Returns fetch batch size.
-   *
-   * Consumer.FetchThread stops fetching new messages if
-   * its incoming queue size is above the threshold and starts again when the 
queue size is below it
-   *
-   * @return fetch threshold
-   */
-  def getFetchThreshold: Int = {
-    Option(consumerProps.getProperty(FETCH_THRESHOLD)).getOrElse("10000").toInt
-  }
-
-  /**
-   * Sets [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which
-   * defines how kafka.common.TopicAndPartitions are mapped to source tasks.
-   *
-   * @param className name of the factory class
-   * @return new KafkaConfig based on this but with
-   *         
[[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]] set to 
given value
-   */
-  def withGrouper(className: String): KafkaSourceConfig = {
-    consumerProps.setProperty(GROUPER_CLASS, className)
-    KafkaSourceConfig(consumerProps)
-  }
-
-  /**
-   * Returns [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] 
instance, which
-   * defines how kafka.common.TopicAndPartitions are mapped to source tasks
-   */
-  def getGrouper: KafkaGrouper = {
-    Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS))
-      
.getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper]
-  }
-
-  def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = {
-    consumerProps.setProperty(CONSUMER_START_OFFSET, s"$earliestOrLatest")
-    KafkaSourceConfig(consumerProps)
-  }
-
-  def getConsumerStartOffset: Long = {
-    Option(consumerProps.getProperty(CONSUMER_START_OFFSET))
-      .getOrElse(s"${OffsetRequest.EarliestTime}").toLong
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
deleted file mode 100644
index 2f7fcf7..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib
-
-import java.io.InputStream
-import java.util.Properties
-
-import kafka.admin.AdminUtils
-import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
-import kafka.consumer.ConsumerConfig
-import kafka.utils.{ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
-import org.apache.kafka.common.serialization.Serializer
-import org.slf4j.Logger
-
-import io.gearpump.util.LogUtil
-
-object KafkaUtil {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def getBroker(connectZk: => ZkClient, topic: String, partition: Int): Broker 
= {
-    val zkClient = connectZk
-    try {
-      val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-        .getOrElse(throw new RuntimeException(
-          s"leader not available for TopicAndPartition($topic, $partition)"))
-      ZkUtils.getBrokerInfo(zkClient, leader)
-        .getOrElse(throw new RuntimeException(s"broker info not found for 
leader $leader"))
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: 
List[String])
-    : Array[TopicAndPartition] = {
-    val zkClient = connectZk
-    try {
-      ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
-        case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
-      }.toArray
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def topicExists(connectZk: => ZkClient, topic: String): Boolean = {
-    val zkClient = connectZk
-    try {
-      AdminUtils.topicExists(zkClient, topic)
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  /**
-   * create a new kafka topic
-   * return true if topic already exists, and false otherwise
-   */
-  def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, 
replicas: Int)
-    : Boolean = {
-    val zkClient = connectZk
-    try {
-      if (AdminUtils.topicExists(zkClient, topic)) {
-        LOG.info(s"topic $topic exists")
-        true
-      } else {
-        AdminUtils.createTopic(zkClient, topic, partitions, replicas)
-        LOG.info(s"created topic $topic")
-        false
-      }
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-        throw e
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def deleteTopic(connectZk: => ZkClient, topic: String): Unit = {
-    val zkClient = connectZk
-    try {
-      AdminUtils.deleteTopic(zkClient, topic)
-    } catch {
-      case e: Exception =>
-        LOG.error(e.getMessage)
-    } finally {
-      zkClient.close()
-    }
-  }
-
-  def connectZookeeper(config: ConsumerConfig): () => ZkClient = {
-    val zookeeperConnect = config.zkConnect
-    val sessionTimeout = config.zkSessionTimeoutMs
-    val connectionTimeout = config.zkConnectionTimeoutMs
-    () => new ZkClient(zookeeperConnect, sessionTimeout, connectionTimeout, 
ZKStringSerializer)
-  }
-
-  def loadProperties(filename: String): Properties = {
-    val props = new Properties()
-    var propStream: InputStream = null
-    try {
-      propStream = getClass.getClassLoader.getResourceAsStream(filename)
-      props.load(propStream)
-    } catch {
-      case e: Exception =>
-        LOG.error(s"$filename not found")
-    } finally {
-      if (propStream != null) {
-        propStream.close()
-      }
-    }
-    props
-  }
-
-  def createKafkaProducer[K, V](properties: Properties,
-      keySerializer: Serializer[K],
-      valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
-    if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == 
null) {
-      properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
-    }
-    new KafkaProducer[K, V](properties, keySerializer, valueSerializer)
-  }
-
-  def buildProducerConfig(bootstrapServers: String): Properties = {
-    val properties = new Properties()
-    properties.setProperty("bootstrap.servers", bootstrapServers)
-    properties
-  }
-
-  def buildConsumerConfig(zkConnect: String): Properties = {
-    val properties = new Properties()
-    properties.setProperty("zookeeper.connect", zkConnect)
-    properties.setProperty("group.id", "gearpump")
-    properties
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
deleted file mode 100644
index 141ae98..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib.consumer
-
-/**
- * someone sleeps for exponentially increasing duration each time
- * until the cap
- *
- * @param backOffMultiplier The factor by which the duration increases.
- * @param initialDurationMs Time in milliseconds for initial sleep.
- * @param maximumDurationMs Cap up to which we will increase the duration.
- */
-private[consumer] class ExponentialBackoffSleeper(
-    backOffMultiplier: Double = 2.0,
-    initialDurationMs: Long = 100,
-    maximumDurationMs: Long = 10000) {
-
-  require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
-  require(initialDurationMs > 0, "initialDurationMs must be positive")
-  require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be 
>= initialDurationMs")
-
-  private var sleepDuration = initialDurationMs
-
-  def reset(): Unit = {
-    sleepDuration = initialDurationMs
-  }
-
-  def sleep(): Unit = {
-    Thread.sleep(sleepDuration)
-    setNextSleepDuration()
-  }
-
-  def getSleepDuration: Long = sleepDuration
-
-  def setNextSleepDuration(): Unit = {
-    val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long]
-    sleepDuration = math.min(math.max(initialDurationMs, next), 
maximumDurationMs)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
deleted file mode 100644
index 8dbe145..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib.consumer
-
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.LinkedBlockingQueue
-
-import kafka.common.TopicAndPartition
-import kafka.consumer.ConsumerConfig
-import org.slf4j.Logger
-
-import io.gearpump.util.LogUtil
-
-object FetchThread {
-  private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
-
-  def apply(topicAndPartitions: Array[TopicAndPartition],
-      fetchThreshold: Int,
-      fetchSleepMS: Long,
-      startOffsetTime: Long,
-      consumerConfig: ConsumerConfig): FetchThread = {
-    val createConsumer = (tp: TopicAndPartition) =>
-      KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig)
-
-    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-    new FetchThread(topicAndPartitions, createConsumer, incomingQueue, 
fetchThreshold, fetchSleepMS)
-  }
-}
-
-/**
- * A thread to fetch messages from multiple kafka 
org.apache.kafka.TopicAndPartition and puts them
- * onto a queue, which is asynchronously polled by a consumer
- *
- * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
- *                       
[[io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to
- *                       connect to it
- * @param incomingQueue a queue to buffer incoming messages
- * @param fetchThreshold above which thread should stop fetching messages
- * @param fetchSleepMS interval to sleep when no more messages or hitting 
fetchThreshold
- */
-private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition],
-    createConsumer: TopicAndPartition => KafkaConsumer,
-    incomingQueue: LinkedBlockingQueue[KafkaMessage],
-    fetchThreshold: Int,
-    fetchSleepMS: Long) extends Thread {
-  import io.gearpump.streaming.kafka.lib.consumer.FetchThread._
-
-  private var consumers: Map[TopicAndPartition, KafkaConsumer] = 
createAllConsumers
-
-  def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
-    consumers(tp).setStartOffset(startOffset)
-  }
-
-  def poll: Option[KafkaMessage] = {
-    Option(incomingQueue.poll())
-  }
-
-  override def run(): Unit = {
-    try {
-      var nextOffsets = Map.empty[TopicAndPartition, Long]
-      var reset = false
-      val sleeper = new ExponentialBackoffSleeper(
-        backOffMultiplier = 2.0,
-        initialDurationMs = 100L,
-        maximumDurationMs = 10000L)
-      while (!Thread.currentThread().isInterrupted) {
-        try {
-          if (reset) {
-            nextOffsets = consumers.mapValues(_.getNextOffset)
-            resetConsumers(nextOffsets)
-            reset = false
-          }
-          val hasMoreMessages = fetchMessage
-          sleeper.reset()
-          if (!hasMoreMessages) {
-            Thread.sleep(fetchSleepMS)
-          }
-        } catch {
-          case exception: Exception =>
-            LOG.warn(s"resetting consumers due to $exception")
-            reset = true
-            sleeper.sleep()
-        }
-      }
-    } catch {
-      case e: InterruptedException => LOG.info("fetch thread got interrupted 
exception")
-      case e: ClosedByInterruptException => LOG.info("fetch thread closed by 
interrupt exception")
-    } finally {
-      consumers.values.foreach(_.close())
-    }
-  }
-
-  /**
-   * fetch message from each TopicAndPartition in a round-robin way
-   */
-  def fetchMessage: Boolean = {
-    consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
-      val (_, consumer) = tpAndConsumer
-      if (incomingQueue.size < fetchThreshold) {
-        if (consumer.hasNext) {
-          incomingQueue.put(consumer.next())
-          true
-        } else {
-          hasNext
-        }
-      } else {
-        true
-      }
-    }
-  }
-
-  private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
-    topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
-  }
-
-  private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit 
= {
-    consumers.values.foreach(_.close())
-    consumers = createAllConsumers
-    consumers.foreach { case (tp, consumer) =>
-      consumer.setStartOffset(nextOffsets(tp))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
deleted file mode 100644
index 77321b9..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib.consumer
-
-import kafka.api.{FetchRequestBuilder, OffsetRequest}
-import kafka.common.ErrorMapping._
-import kafka.common.TopicAndPartition
-import kafka.consumer.{ConsumerConfig, SimpleConsumer}
-import kafka.message.MessageAndOffset
-import kafka.utils.Utils
-
-import io.gearpump.streaming.kafka.lib.KafkaUtil
-
-object KafkaConsumer {
-  def apply(topic: String, partition: Int, startOffsetTime: Long, config: 
ConsumerConfig)
-    : KafkaConsumer = {
-    val connectZk = KafkaUtil.connectZookeeper(config)
-    val broker = KafkaUtil.getBroker(connectZk(), topic, partition)
-    val soTimeout = config.socketTimeoutMs
-    val soBufferSize = config.socketReceiveBufferBytes
-    val fetchSize = config.fetchMessageMaxBytes
-    val clientId = config.clientId
-    val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, 
soBufferSize, clientId)
-    val getIterator = (offset: Long) => {
-      val request = new FetchRequestBuilder()
-        .addFetch(topic, partition, offset, fetchSize)
-        .build()
-
-      val response = consumer.fetch(request)
-      response.errorCode(topic, partition) match {
-        case NoError => response.messageSet(topic, partition).iterator
-        case error => throw exceptionFor(error)
-      }
-    }
-    new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime)
-  }
-}
-
-/**
- * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over
- * messages from a kafka kafka.common.TopicAndPartition.
- */
-class KafkaConsumer(consumer: SimpleConsumer,
-    topic: String,
-    partition: Int,
-    getIterator: (Long) => Iterator[MessageAndOffset],
-    startOffsetTime: Long = OffsetRequest.EarliestTime) {
-  private val earliestOffset = consumer
-    .earliestOrLatestOffset(TopicAndPartition(topic, partition), 
startOffsetTime, -1)
-  private var nextOffset: Long = earliestOffset
-  private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset)
-
-  def setStartOffset(startOffset: Long): Unit = {
-    nextOffset = startOffset
-    iterator = getIterator(nextOffset)
-  }
-
-  def next(): KafkaMessage = {
-    val mo = iterator.next()
-    val message = mo.message
-
-    nextOffset = mo.nextOffset
-
-    val offset = mo.offset
-    val payload = Utils.readBytes(message.payload)
-    new KafkaMessage(topic, partition, offset, 
Option(message.key).map(Utils.readBytes), payload)
-  }
-
-  def hasNext: Boolean = {
-    @annotation.tailrec
-    def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): 
Boolean = {
-      if (iter.hasNext) true
-      else if (newIterator) false
-      else {
-        iterator = getIterator(nextOffset)
-        hasNextHelper(iterator, newIterator = true)
-      }
-    }
-    hasNextHelper(iterator, newIterator = false)
-  }
-
-  def getNextOffset: Long = nextOffset
-
-  def close(): Unit = {
-    consumer.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
deleted file mode 100644
index 16330ed..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib.consumer
-
-import kafka.common.TopicAndPartition
-
-/**
- * wrapper over messages from kafka
- * @param topicAndPartition where message comes from
- * @param offset message offset on kafka queue
- * @param key message key, could be None
- * @param msg message payload
- */
-case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long,
-    key: Option[Array[Byte]], msg: Array[Byte]) {
-
-  def this(topic: String, partition: Int, offset: Long,
-      key: Option[Array[Byte]], msg: Array[Byte]) = {
-    this(TopicAndPartition(topic, partition), offset, key, msg)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
deleted file mode 100644
index 0f968e2..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib.grouper
-
-import kafka.common.TopicAndPartition
-
-/**
- * default grouper groups TopicAndPartitions among StreamProducers by 
partitions
- *
- * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) 
and
- * 2 streamProducers (streamProducer0 and streamProducer1)
- *
- * streamProducer0 gets (topicA, partition1), (topicB, partition1) and 
(topicA, partition3)
- * streamProducer1 gets (topicA, partition2), (topicB, partition2)
- */
-class KafkaDefaultGrouper extends KafkaGrouper {
-  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: 
Array[TopicAndPartition])
-    : Array[TopicAndPartition] = {
-    topicAndPartitions.indices.filter(_ % taskNum == taskIndex)
-      .map(i => topicAndPartitions(i)).toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
deleted file mode 100644
index 6660a04..0000000
--- 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.kafka.lib.grouper
-
-import kafka.common.TopicAndPartition
-
-/**
- * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks
- */
-trait KafkaGrouper {
-  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: 
Array[TopicAndPartition])
-    : Array[TopicAndPartition]
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
new file mode 100644
index 0000000..cb90f93
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka
+
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ * kafka sink connectors that invokes 
org.apache.kafka.clients.producer.KafkaProducer to send
+ * messages to kafka queue
+ * @param getProducer is a function to construct a KafkaProducer
+ * @param topic is the kafka topic to write to
+ */
+class KafkaSink private[kafka](
+    getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) 
extends DataSink {
+
+  /**
+   * @param topic producer topic
+   * @param properties producer config
+   */
+  def this(topic: String, properties: Properties) = {
+    this(() => KafkaUtil.createKafkaProducer(properties,
+      new ByteArraySerializer, new ByteArraySerializer), topic)
+  }
+
+  /**
+   *
+   * creates an empty properties with `bootstrap.servers` set to 
`bootstrapServers`
+   * and invokes `KafkaSink(topic, properties)`
+   * @param topic producer topic
+   * @param bootstrapServers kafka producer config `bootstrap.servers`
+   */
+  def this(topic: String, bootstrapServers: String) = {
+    this(topic, KafkaUtil.buildProducerConfig(bootstrapServers))
+  }
+
+  // Lazily construct producer since KafkaProducer is not serializable
+  private lazy val producer = getProducer()
+
+  override def open(context: TaskContext): Unit = {}
+
+  override def write(message: Message): Unit = {
+    val record = message.msg match {
+      case (k, v) =>
+        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
k.asInstanceOf[Array[Byte]],
+          v.asInstanceOf[Array[Byte]])
+      case v =>
+        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
v.asInstanceOf[Array[Byte]])
+    }
+    producer.send(record)
+  }
+
+  override def close(): Unit = {
+    producer.close()
+  }
+}
+

Reply via email to