Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 8939a2d42 -> 56ac28642


fix GEARPUMP-160, add KafkaDSL examples and fix docs

Author: manuzhang <[email protected]>

Closes #41 from manuzhang/kafka_doc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/56ac2864
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/56ac2864
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/56ac2864

Branch: refs/heads/master
Commit: 56ac28642bc030e140dbad022a6a164e55ec23f9
Parents: 8939a2d
Author: manuzhang <[email protected]>
Authored: Mon Jun 20 16:36:58 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Mon Jun 20 16:36:58 2016 +0800

----------------------------------------------------------------------
 docs/dev-connectors.md                          | 132 +++++++++++++------
 docs/js/main.js                                 |  12 +-
 examples/streaming/kafka/README.md              |   2 +-
 .../examples/kafka/dsl/KafkaReadWrite.scala     |  81 ++++++++++++
 external/kafka/README.md                        |   2 +-
 .../streaming/kafka/util/KafkaConfig.java       |  24 +---
 .../gearpump/streaming/kafka/dsl/KafkaDSL.scala | 100 ++++++++++++++
 .../streaming/kafka/dsl/KafkaDSLSink.scala      |  46 -------
 .../streaming/kafka/dsl/KafkaDSLUtil.scala      |  37 ------
 9 files changed, 287 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/docs/dev-connectors.md
----------------------------------------------------------------------
diff --git a/docs/dev-connectors.md b/docs/dev-connectors.md
index 2060c93..676c0b4 100644
--- a/docs/dev-connectors.md
+++ b/docs/dev-connectors.md
@@ -7,14 +7,11 @@ title: Gearpump Connectors
 `DataSource` and `DataSink` are the two main concepts Gearpump use to connect 
with the outside world.
 
 ### DataSource
-`DataSource` is the concept in Gearpump that without input and will output 
messages. So, basically, `DataSource` is the start point of a streaming 
processing flow.
+`DataSource` is the start point of a streaming processing flow. 
 
-As Gearpump depends on `DataSource` to be replay-able to ensure at-least-once 
message delivery and exactly-once message delivery, for some data sources, we 
will need a 
`org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory` to store 
the offset (progress) of current `DataSource`. So that, when a replay is 
needed, Gearpump can guide `DataSource` to replay from certain offset.
-
-Currently Gearpump `DataSource` only support infinite stream. Finite stream 
support will be added in a near future release.
 
 ### DataSink
-`DataSink` is the concept that without output but will consume messages. So, 
`Sink` is the end point of a streaming processing flow.
+`DataSink` is the end point of a streaming processing flow.
 
 ## Implemented Connectors
 
@@ -36,72 +33,125 @@ Name | Description
 
 ## Use of Connectors
 
-### Use of `KafkaSource`
-To use `kafkaSource` in your application, you first need to add the 
`gearpump-external-Kafka` library dependency in your application:
+### Use of Kafka connectors
+
+To use Kafka connectors in your application, you first need to add the 
`gearpump-external-kafka` library dependency in your application:
 
+<div class="codetabs">
+<div data-lang="sbt" data-label="sbt" markdown="1" >
 ```
-"com.github.intel-hadoop" %% "gearpump-external-kafka" % {{ 
site.GEARPUMP_VERSION }}
+"org.apache.gearpump" %% "gearpump-external-kafka" % {{ site.GEARPUMP_VERSION 
}}
 ```
+</div>
 
+<div data-lang="xml" data-label="xml" markdown="1">
 ```xml
 <dependency>
-  <groupId>com.github.intel-hadoop</groupId>
+  <groupId>org.apache.gearpump</groupId>
   <artifactId>gearpump-external-kafka</artifactId>
   <version>{{ site.GEARPUMP_VERSION }}</version>
 </dependency>
 ```
+</div>
+</div>
 
-To connect to Kafka, you need to provide following info:
- - the Zookeeper address
- - the Kafka topic
+This is a simple example to read from Kafka and write it back using 
`KafkaSource` and `KafkaSink`. Users can optionally set a 
`CheckpointStoreFactory` such that Kafka offsets are checkpointed and 
at-least-once message delivery is guaranteed. 
 
-Then, you can use `KafkaSource` in your application:
+<div class="codetabs">
+<div data-lang="scala" data-label="Processor API" markdown="1">
+```scala
+    val appConfig = UserConfig.empty
+    val props = new Properties
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
+    val source = new KafkaSource(sourceTopic, props)
+    val checkpointStoreFactory = new KafkaStoreFactory(props)
+    source.setCheckpointStore(checkpointStoreFactory)
+    val sourceProcessor = DataSourceProcessor(source, sourceNum)
+    val sink = new KafkaSink(sinkTopic, props)
+    val sinkProcessor = DataSinkProcessor(sink, sinkNum)
+    val partitioner = new ShufflePartitioner
+    val computation = sourceProcessor ~ partitioner ~> sinkProcessor
+    val app = StreamApplication(appName, Graph(computation), appConfig)
+```
+</div>
 
+<div data-lang="scala" data-label="Stream DSL" markdown="1">
 ```scala
+    val props = new Properties
+    val appName = "KafkaDSL"
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
+
+    val app = StreamApp(appName, context)
+
+    if (atLeastOnce) {
+      val checkpointStoreFactory = new KafkaStoreFactory(props)
+      KafkaDSL.createAtLeastOnceStream(app, sourceTopic, 
checkpointStoreFactory, props, sourceNum)
+          .writeToKafka(sinkTopic, props, sinkNum)
+    } else {
+      KafkaDSL.createAtMostOnceStream(app, sourceTopic, props, sourceNum)
+          .writeToKafka(sinkTopic, props, sinkNum)
+    }
+```
+</div>
+</div>
 
-   //Specify the offset storage.
-   //Here we use the same zookeeper as the offset storage.
-   //A set of corresponding topics will be created to store the offsets.
-   //You are free to specify your own offset storage
-   val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
+In the above example, configurations are set through Java properties and 
shared by `KafkaSource`, `KafkaSink` and `KafkaCheckpointStoreFactory`.
+Their configurations can be defined differently as below. 
 
-   //create the kafka data source
-   val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
+#### `KafkaSource` configurations
 
-   //create Gearpump Processor
-   val reader = DataSourceProcessor(source, parallelism)
-```
+Name | Descriptions | Type | Default 
+---- | ------------ | ---- | -------
+`KafkaConfig.ZOOKEEPER_CONNECT_CONFIG` | Zookeeper connect string for Kafka 
topics management | String 
+`KafkaConfig.CLIENT_ID_CONFIG` | An id string to pass to the server when 
making requests | String | ""  
+`KafkaConfig.GROUP_ID_CONFIG` | A string that uniquely identifies a set of 
consumers within the same consumer group | "" 
+`KafkaConfig.FETCH_SLEEP_MS_CONFIG` | The amount of time(ms) to sleep when 
hitting fetch.threshold | Int | 100 
+`KafkaConfig.FETCH_THRESHOLD_CONFIG` | Size of internal queue to keep Kafka 
messages. Stop fetching and go to sleep when hitting the threshold | Int | 
10000 
+`KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG` | Partition grouper class to 
group partitions amoung source tasks |  Class | DefaultPartitionGrouper 
+`KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG` | Message decoder class to decode 
raw bytes from Kafka | Class | DefaultMessageDecoder 
+`KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG` | Timestamp filter class to filter 
out late messages | Class | DefaultTimeStampFilter 
 
-```scala
 
-  //specify the offset storage
-  //here we use the same zookeeper as the offset storage (a set of 
corresponding topics will be created to store the offsets)
-  //you are free to specify your own offset storage
-  val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
+#### `KafkaSink` configurations
 
-  val source = KafkaDSLUtil.createStream(app, parallelism, "Kafka Source", 
topics, zookeepers, offsetStorageFactory)
-  ...
-```
+Name | Descriptions | Type | Default 
+---- | ------------ | ---- | ------- 
+`KafkaConfig.BOOTSTRAP_SERVERS_CONFIG` | A list of host/port pairs to use for 
establishing the initial connection to the Kafka cluster | String |  
+`KafkaConfig.CLIENT_ID_CONFIG` | An id string to pass to the server when 
making requests | String | ""  
+
+#### `KafkaCheckpointStoreFactory` configurations
+
+Name | Descriptions | Type | Default 
+---- | ------------ | ---- | ------- 
+`KafkaConfig.ZOOKEEPER_CONNECT_CONFIG` | Zookeeper connect string for Kafka 
topics management | String | 
+`KafkaConfig.BOOTSTRAP_SERVERS_CONFIG` | A list of host/port pairs to use for 
establishing the initial connection to the Kafka cluster | String | 
+`KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX` | Name prefix for checkpoint store 
| String | "" 
+`KafkaConfig.REPLICATION_FACTOR` | Replication factor for checkpoint store 
topic | Int | 1 
 
 ### Use of `HBaseSink`
 
 To use `HBaseSink` in your application, you first need to add the 
`gearpump-external-hbase` library dependency in your application:
 
 ```
-"com.github.intel-hadoop" %% "gearpump-external-hbase" % {{ 
site.GEARPUMP_VERSION }}
+"org.apache.gearpump" %% "gearpump-external-hbase" % {{ site.GEARPUMP_VERSION 
}}
 ```
 
 ```xml
 <dependency>
-  <groupId>com.github.intel-hadoop</groupId>
+  <groupId>org.apache.gearpump</groupId>
   <artifactId>gearpump-external-hbase</artifactId>
   <version>{{ site.GEARPUMP_VERSION }}</version>
 </dependency>
 ```
 
 To connect to HBase, you need to provide following info:
- - the HBase configuration to tell which HBase service to connect
- - the table name (you must create the table yourself, see the [HBase 
documentation](https://hbase.apache.org/book.html))
+  
+  * the HBase configuration to tell which HBase service to connect
+  * the table name (you must create the table yourself, see the [HBase 
documentation](https://hbase.apache.org/book.html))
 
 Then, you can use `HBaseSink` in your application:
 
@@ -153,16 +203,14 @@ Below is some code snippet from `KafkaDSLUtil`:
 
 ```scala
 object KafkaDSLUtil {
-  //T is the message type
-  def createStream[T: ClassTag](
+
+  def createStream[T](
       app: StreamApp,
+      topics: String,
       parallelism: Int,
       description: String,
-      topics: String,
-      zkConnect: String,
-      offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory)
-        with TypedDataSource[T], parallelism, description)
+      properties: Properties): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(topics, properties), parallelism, 
description)
   }
 }
 ```

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/docs/js/main.js
----------------------------------------------------------------------
diff --git a/docs/js/main.js b/docs/js/main.js
index ae989af..f70bbc1 100644
--- a/docs/js/main.js
+++ b/docs/js/main.js
@@ -38,15 +38,19 @@ function codeTabs() {
       $(this).addClass("tab-pane");
       var lang = $(this).data("lang");
       var image = $(this).data("image");
+      var label = $(this).data("label");
       var notabs = $(this).data("notabs");
-      var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1);
+      if (label == null) {
+        var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1);
+        label = capitalizedLang;
+      }
       lang = lang.replace(/ /g, '');
-      var id = "tab_" + lang + "_" + counter;
+      var id = "tab_" + label.replace(/ /g, '_') + "_" + counter;
       $(this).attr("id", id);
       if (image != null && langImages[lang]) {
-        var buttonLabel = "<img src='" + langImages[lang] + "' alt='" + 
capitalizedLang + "' />";
+        var buttonLabel = "<img src='" + langImages[lang] + "' alt='" + label 
+ "' />";
       } else if (notabs == null) {
-        var buttonLabel = "<b>" + capitalizedLang + "</b>";
+        var buttonLabel = "<b>" + label + "</b>";
       } else {
         var buttonLabel = ""
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/examples/streaming/kafka/README.md
----------------------------------------------------------------------
diff --git a/examples/streaming/kafka/README.md 
b/examples/streaming/kafka/README.md
index 587271c..1ae4036 100644
--- a/examples/streaming/kafka/README.md
+++ b/examples/streaming/kafka/README.md
@@ -81,7 +81,7 @@ Change directory into gearpump root, build gearpump with `sbt 
pack` and launch a
 Finally, let's run the KafkaWordCount example.
 
    ```bash
-   ./target/pack/bin/gear app -jar 
./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar 
org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount
+   bin/gear app -jar examples/kafka-$VERSION-assembly.jar 
org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount
    ```
 
 One more step is to verify that we've succeeded in producing data to Kafka.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
new file mode 100644
index 0000000..49d3619
--- /dev/null
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.kafka.dsl
+
+import java.util.Properties
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser}
+import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.kafka.KafkaStoreFactory
+import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL
+import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.util.AkkaApp
+
+object KafkaReadWrite extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = 
false,
+      defaultValue = Some(1)),
+    "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = 
false,
+      defaultValue = Some(1)),
+    "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", 
required = false,
+      defaultValue = Some("localhost:2181")),
+    "brokerList" -> CLIOption[String]("<broker server list string>", required 
= false,
+      defaultValue = Some("localhost:9092")),
+    "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = 
false,
+      defaultValue = Some("topic1")),
+    "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false,
+      defaultValue = Some("topic2")),
+    "atLeastOnce" -> CLIOption[Boolean]("<turn on at least once source>", 
required = false,
+      defaultValue = Some(true))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val sourceNum = config.getInt("source")
+    val sinkNum = config.getInt("sink")
+    val zookeeperConnect = config.getString("zookeeperConnect")
+    val brokerList = config.getString("brokerList")
+    val sourceTopic = config.getString("sourceTopic")
+    val sinkTopic = config.getString("sinkTopic")
+    val atLeastOnce = config.getBoolean("atLeastOnce")
+    val props = new Properties
+    val appName = "KafkaDSL"
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
+
+    val context = ClientContext(akkaConf)
+    val app = StreamApp(appName, context)
+
+    if (atLeastOnce) {
+      val checkpointStoreFactory = new KafkaStoreFactory(props)
+      KafkaDSL.createAtLeastOnceStream(app, sourceTopic, 
checkpointStoreFactory, props, sourceNum)
+        .writeToKafka(sinkTopic, props, sinkNum)
+    } else {
+      KafkaDSL.createAtMostOnceStream(app, sourceTopic, props, sourceNum)
+        .writeToKafka(sinkTopic, props, sinkNum)
+    }
+
+    context.submit(app)
+    context.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/README.md
----------------------------------------------------------------------
diff --git a/external/kafka/README.md b/external/kafka/README.md
index 5adca88..e3e09a0 100644
--- a/external/kafka/README.md
+++ b/external/kafka/README.md
@@ -1,3 +1,3 @@
 Kafka Source and Sink.
 
-Check example at: 
https://github.com/intel-hadoop/gearpump/tree/master/examples/streaming/kafka
+Check example at: 
https://github.com/apache/incubator-gearpump/tree/master/examples/streaming/kafka

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
index 403f213..0d5bec7 100644
--- 
a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
+++ 
b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
@@ -23,10 +23,8 @@ import kafka.common.TopicAndPartition;
 import kafka.consumer.ConsumerConfig;
 import org.apache.gearpump.streaming.kafka.lib.source.DefaultMessageDecoder;
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient;
-import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread;
 import 
org.apache.gearpump.streaming.kafka.lib.source.grouper.DefaultPartitionGrouper;
 import org.apache.gearpump.streaming.source.DefaultTimeStampFilter;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 
@@ -64,7 +62,7 @@ public class KafkaConfig extends AbstractConfig implements 
Serializable {
 
   public static final String GROUP_ID_CONFIG = "group.id";
   public static final String GROUP_ID_DOC =
-      "a string that uniquely identifies a set of consumers within the same 
consumer group";
+      "A string that uniquely identifies a set of consumers within the same 
consumer group";
 
   public static final String ENABLE_AUTO_COMMIT_CONFIG = "auto.commit.enable";
   public static final String ENABLE_AUTO_COMMIT_DOC =
@@ -72,11 +70,11 @@ public class KafkaConfig extends AbstractConfig implements 
Serializable {
 
   /** KafkaSource specific configs */
   public static final String CONSUMER_START_OFFSET_CONFIG = 
"consumer.start.offset";
-  private static final String CONSUMER_START_OFFSET_DOC = "kafka offset to 
start consume from. "
+  private static final String CONSUMER_START_OFFSET_DOC = "Kafka offset to 
start consume from. "
       + "This will be overwritten when checkpoint recover takes effect.";
 
   public static final String FETCH_THRESHOLD_CONFIG = "fetch.threshold";
-  private static final String FETCH_THRESHOLD_DOC = "kafka messages are 
fetched asynchronously "
+  private static final String FETCH_THRESHOLD_DOC = "Kafka messages are 
fetched asynchronously "
       + "and put onto a internal queue. When the number of messages in the 
queue hit the threshold,"
       + "the fetch thread stops fetching, and goes to sleep. It starts 
fetching again when the"
       + "number falls below the threshold";
@@ -102,7 +100,7 @@ public class KafkaConfig extends AbstractConfig implements 
Serializable {
       "The replication factor for checkpoint store topic.";
 
   public static final String CHECKPOINT_STORE_NAME_PREFIX_CONFIG = 
"checkpoint.store.name.prefix";
-  public static final String CHECKPOINT_STORE_NAME_PREFIX_DOC = "name prefix 
for checkpoint "
+  public static final String CHECKPOINT_STORE_NAME_PREFIX_DOC = "Name prefix 
for checkpoint "
       + "store whose name will be of the form, 
namePrefix-sourceTopic-partitionId";
 
   static {
@@ -184,12 +182,6 @@ public class KafkaConfig extends AbstractConfig implements 
Serializable {
     return tp.topic() + "-" + tp.partition();
   }
 
-  public ConsumerConfig getConsumerConfig() {
-    Properties props = getBaseConsumerConfigs();
-
-    return new ConsumerConfig(props);
-  }
-
   public Properties getProducerConfig() {
     Properties props = new Properties();
     props.putAll(this.originals());
@@ -211,12 +203,8 @@ public class KafkaConfig extends AbstractConfig implements 
Serializable {
     return KafkaClient.factory();
   }
 
-  public FetchThread.FetchThreadFactory getFetchThreadFactory() {
-    return FetchThread.factory();
-  }
-
 
-  private Properties getBaseConsumerConfigs() {
+  public ConsumerConfig getConsumerConfig() {
     Properties props = new Properties();
     props.putAll(this.originals());
 
@@ -232,7 +220,7 @@ public class KafkaConfig extends AbstractConfig implements 
Serializable {
     }
     props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
 
-    return props;
+    return new ConsumerConfig(props);
   }
 
   private void removeSourceSpecificConfigs(Properties props) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
new file mode 100644
index 0000000..f1bb26a
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka.dsl
+
+import java.util.Properties
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl
+import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+
+object KafkaDSL {
+
+  /**
+   * Creates stream from Kafka where Kafka offsets are not checkpointed
+   * @param app stream application
+   * @param topics Kafka source topics
+   * @param properties Kafka configurations
+   * @param parallelism number of source tasks
+   * @param config task configurations
+   * @param description descriptions to mark source on dashboard
+   * @return a stream reading data from Kafka
+   */
+  def createAtMostOnceStream[T](
+      app: StreamApp,
+      topics: String,
+      properties: Properties,
+      parallelism: Int = 1,
+      config: UserConfig = UserConfig.empty,
+      description: String = "KafkaSource"
+      ): dsl.Stream[T] = {
+    app.source[T](new KafkaSource(topics, properties), parallelism, config, 
description)
+  }
+
+  /**
+   * Creates stream from Kafka where Kafka offsets are checkpointed with 
timestamp
+   * @param app stream application
+   * @param topics Kafka source topics
+   * @param checkpointStoreFactory factory to build checkpoint store
+   * @param properties Kafka configurations
+   * @param parallelism number of source tasks
+   * @param config task configurations
+   * @param description descriptions to mark source on dashboard
+   * @return a stream reading data from Kafka
+   */
+  def createAtLeastOnceStream[T](
+      app: StreamApp,
+      topics: String,
+      checkpointStoreFactory: CheckpointStoreFactory,
+      properties: Properties,
+      parallelism: Int = 1,
+      config: UserConfig = UserConfig.empty,
+      description: String = "KafkaSource"): dsl.Stream[T] = {
+    val source = new KafkaSource(topics, properties)
+    source.setCheckpointStore(checkpointStoreFactory)
+    app.source[T](source, parallelism, config, description)
+  }
+
+  import scala.language.implicitConversions
+  implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = {
+    new KafkaDSL[T](stream)
+  }
+}
+
+class KafkaDSL[T](stream: dsl.Stream[T]) {
+
+  /**
+   * Sinks data to Kafka
+   * @param topic Kafka sink topic
+   * @param properties Kafka configurations
+   * @param parallelism number of sink tasks
+   * @param userConfig task configurations
+   * @param description descriptions to mark sink on dashboard
+   * @return a stream writing data to Kafka
+   */
+  def writeToKafka(
+      topic: String,
+      properties: Properties,
+      parallelism: Int = 1,
+      userConfig: UserConfig = UserConfig.empty,
+      description: String = "KafkaSink"): dsl.Stream[T] = {
+    stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, 
description)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
deleted file mode 100644
index b34149f..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.kafka.dsl
-
-import java.util.Properties
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl
-import org.apache.gearpump.streaming.kafka.KafkaSink
-
-class KafkaDSLSink[T](stream: dsl.Stream[T]) {
-
-  /** Create a Kafka DSL Sink */
-  def writeToKafka(
-      topic: String,
-      properties: Properties,
-      parallelism: Int = 1,
-      userConfig: UserConfig = UserConfig.empty,
-      description: String = null): dsl.Stream[T] = {
-    stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, 
description)
-  }
-}
-
-object KafkaDSLSink {
-
-  import scala.language.implicitConversions
-
-  implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] 
= {
-    new KafkaDSLSink[T](stream)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
deleted file mode 100644
index 874d691..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.kafka.dsl
-
-import java.util.Properties
-
-import org.apache.gearpump.streaming.dsl
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.kafka.KafkaSource
-
-object KafkaDSLUtil {
-
-  def createStream[T](
-      app: StreamApp,
-      topics: String,
-      parallelism: Int,
-      description: String,
-      properties: Properties): dsl.Stream[T] = {
-    app.source[T](new KafkaSource(topics, properties), parallelism, 
description)
-  }
-}
-

Reply via email to