Repository: incubator-gearpump Updated Branches: refs/heads/master 8939a2d42 -> 56ac28642
fix GEARPUMP-160, add KafkaDSL examples and fix docs Author: manuzhang <[email protected]> Closes #41 from manuzhang/kafka_doc. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/56ac2864 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/56ac2864 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/56ac2864 Branch: refs/heads/master Commit: 56ac28642bc030e140dbad022a6a164e55ec23f9 Parents: 8939a2d Author: manuzhang <[email protected]> Authored: Mon Jun 20 16:36:58 2016 +0800 Committer: manuzhang <[email protected]> Committed: Mon Jun 20 16:36:58 2016 +0800 ---------------------------------------------------------------------- docs/dev-connectors.md | 132 +++++++++++++------ docs/js/main.js | 12 +- examples/streaming/kafka/README.md | 2 +- .../examples/kafka/dsl/KafkaReadWrite.scala | 81 ++++++++++++ external/kafka/README.md | 2 +- .../streaming/kafka/util/KafkaConfig.java | 24 +--- .../gearpump/streaming/kafka/dsl/KafkaDSL.scala | 100 ++++++++++++++ .../streaming/kafka/dsl/KafkaDSLSink.scala | 46 ------- .../streaming/kafka/dsl/KafkaDSLUtil.scala | 37 ------ 9 files changed, 287 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/docs/dev-connectors.md ---------------------------------------------------------------------- diff --git a/docs/dev-connectors.md b/docs/dev-connectors.md index 2060c93..676c0b4 100644 --- a/docs/dev-connectors.md +++ b/docs/dev-connectors.md @@ -7,14 +7,11 @@ title: Gearpump Connectors `DataSource` and `DataSink` are the two main concepts Gearpump use to connect with the outside world. ### DataSource -`DataSource` is the concept in Gearpump that without input and will output messages. So, basically, `DataSource` is the start point of a streaming processing flow. +`DataSource` is the start point of a streaming processing flow. -As Gearpump depends on `DataSource` to be replay-able to ensure at-least-once message delivery and exactly-once message delivery, for some data sources, we will need a `org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory` to store the offset (progress) of current `DataSource`. So that, when a replay is needed, Gearpump can guide `DataSource` to replay from certain offset. - -Currently Gearpump `DataSource` only support infinite stream. Finite stream support will be added in a near future release. ### DataSink -`DataSink` is the concept that without output but will consume messages. So, `Sink` is the end point of a streaming processing flow. +`DataSink` is the end point of a streaming processing flow. ## Implemented Connectors @@ -36,72 +33,125 @@ Name | Description ## Use of Connectors -### Use of `KafkaSource` -To use `kafkaSource` in your application, you first need to add the `gearpump-external-Kafka` library dependency in your application: +### Use of Kafka connectors + +To use Kafka connectors in your application, you first need to add the `gearpump-external-kafka` library dependency in your application: +<div class="codetabs"> +<div data-lang="sbt" data-label="sbt" markdown="1" > ``` -"com.github.intel-hadoop" %% "gearpump-external-kafka" % {{ site.GEARPUMP_VERSION }} +"org.apache.gearpump" %% "gearpump-external-kafka" % {{ site.GEARPUMP_VERSION }} ``` +</div> +<div data-lang="xml" data-label="xml" markdown="1"> ```xml <dependency> - <groupId>com.github.intel-hadoop</groupId> + <groupId>org.apache.gearpump</groupId> <artifactId>gearpump-external-kafka</artifactId> <version>{{ site.GEARPUMP_VERSION }}</version> </dependency> ``` +</div> +</div> -To connect to Kafka, you need to provide following info: - - the Zookeeper address - - the Kafka topic +This is a simple example to read from Kafka and write it back using `KafkaSource` and `KafkaSink`. Users can optionally set a `CheckpointStoreFactory` such that Kafka offsets are checkpointed and at-least-once message delivery is guaranteed. -Then, you can use `KafkaSource` in your application: +<div class="codetabs"> +<div data-lang="scala" data-label="Processor API" markdown="1"> +```scala + val appConfig = UserConfig.empty + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + val source = new KafkaSource(sourceTopic, props) + val checkpointStoreFactory = new KafkaStoreFactory(props) + source.setCheckpointStore(checkpointStoreFactory) + val sourceProcessor = DataSourceProcessor(source, sourceNum) + val sink = new KafkaSink(sinkTopic, props) + val sinkProcessor = DataSinkProcessor(sink, sinkNum) + val partitioner = new ShufflePartitioner + val computation = sourceProcessor ~ partitioner ~> sinkProcessor + val app = StreamApplication(appName, Graph(computation), appConfig) +``` +</div> +<div data-lang="scala" data-label="Stream DSL" markdown="1"> ```scala + val props = new Properties + val appName = "KafkaDSL" + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + + val app = StreamApp(appName, context) + + if (atLeastOnce) { + val checkpointStoreFactory = new KafkaStoreFactory(props) + KafkaDSL.createAtLeastOnceStream(app, sourceTopic, checkpointStoreFactory, props, sourceNum) + .writeToKafka(sinkTopic, props, sinkNum) + } else { + KafkaDSL.createAtMostOnceStream(app, sourceTopic, props, sourceNum) + .writeToKafka(sinkTopic, props, sinkNum) + } +``` +</div> +</div> - //Specify the offset storage. - //Here we use the same zookeeper as the offset storage. - //A set of corresponding topics will be created to store the offsets. - //You are free to specify your own offset storage - val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers) +In the above example, configurations are set through Java properties and shared by `KafkaSource`, `KafkaSink` and `KafkaCheckpointStoreFactory`. +Their configurations can be defined differently as below. - //create the kafka data source - val source = new KafkaSource(topic, zookeepers, offsetStorageFactory) +#### `KafkaSource` configurations - //create Gearpump Processor - val reader = DataSourceProcessor(source, parallelism) -``` +Name | Descriptions | Type | Default +---- | ------------ | ---- | ------- +`KafkaConfig.ZOOKEEPER_CONNECT_CONFIG` | Zookeeper connect string for Kafka topics management | String +`KafkaConfig.CLIENT_ID_CONFIG` | An id string to pass to the server when making requests | String | "" +`KafkaConfig.GROUP_ID_CONFIG` | A string that uniquely identifies a set of consumers within the same consumer group | "" +`KafkaConfig.FETCH_SLEEP_MS_CONFIG` | The amount of time(ms) to sleep when hitting fetch.threshold | Int | 100 +`KafkaConfig.FETCH_THRESHOLD_CONFIG` | Size of internal queue to keep Kafka messages. Stop fetching and go to sleep when hitting the threshold | Int | 10000 +`KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG` | Partition grouper class to group partitions amoung source tasks | Class | DefaultPartitionGrouper +`KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG` | Message decoder class to decode raw bytes from Kafka | Class | DefaultMessageDecoder +`KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG` | Timestamp filter class to filter out late messages | Class | DefaultTimeStampFilter -```scala - //specify the offset storage - //here we use the same zookeeper as the offset storage (a set of corresponding topics will be created to store the offsets) - //you are free to specify your own offset storage - val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers) +#### `KafkaSink` configurations - val source = KafkaDSLUtil.createStream(app, parallelism, "Kafka Source", topics, zookeepers, offsetStorageFactory) - ... -``` +Name | Descriptions | Type | Default +---- | ------------ | ---- | ------- +`KafkaConfig.BOOTSTRAP_SERVERS_CONFIG` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster | String | +`KafkaConfig.CLIENT_ID_CONFIG` | An id string to pass to the server when making requests | String | "" + +#### `KafkaCheckpointStoreFactory` configurations + +Name | Descriptions | Type | Default +---- | ------------ | ---- | ------- +`KafkaConfig.ZOOKEEPER_CONNECT_CONFIG` | Zookeeper connect string for Kafka topics management | String | +`KafkaConfig.BOOTSTRAP_SERVERS_CONFIG` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster | String | +`KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX` | Name prefix for checkpoint store | String | "" +`KafkaConfig.REPLICATION_FACTOR` | Replication factor for checkpoint store topic | Int | 1 ### Use of `HBaseSink` To use `HBaseSink` in your application, you first need to add the `gearpump-external-hbase` library dependency in your application: ``` -"com.github.intel-hadoop" %% "gearpump-external-hbase" % {{ site.GEARPUMP_VERSION }} +"org.apache.gearpump" %% "gearpump-external-hbase" % {{ site.GEARPUMP_VERSION }} ``` ```xml <dependency> - <groupId>com.github.intel-hadoop</groupId> + <groupId>org.apache.gearpump</groupId> <artifactId>gearpump-external-hbase</artifactId> <version>{{ site.GEARPUMP_VERSION }}</version> </dependency> ``` To connect to HBase, you need to provide following info: - - the HBase configuration to tell which HBase service to connect - - the table name (you must create the table yourself, see the [HBase documentation](https://hbase.apache.org/book.html)) + + * the HBase configuration to tell which HBase service to connect + * the table name (you must create the table yourself, see the [HBase documentation](https://hbase.apache.org/book.html)) Then, you can use `HBaseSink` in your application: @@ -153,16 +203,14 @@ Below is some code snippet from `KafkaDSLUtil`: ```scala object KafkaDSLUtil { - //T is the message type - def createStream[T: ClassTag]( + + def createStream[T]( app: StreamApp, + topics: String, parallelism: Int, description: String, - topics: String, - zkConnect: String, - offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory) - with TypedDataSource[T], parallelism, description) + properties: Properties): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, properties), parallelism, description) } } ``` http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/docs/js/main.js ---------------------------------------------------------------------- diff --git a/docs/js/main.js b/docs/js/main.js index ae989af..f70bbc1 100644 --- a/docs/js/main.js +++ b/docs/js/main.js @@ -38,15 +38,19 @@ function codeTabs() { $(this).addClass("tab-pane"); var lang = $(this).data("lang"); var image = $(this).data("image"); + var label = $(this).data("label"); var notabs = $(this).data("notabs"); - var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1); + if (label == null) { + var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1); + label = capitalizedLang; + } lang = lang.replace(/ /g, ''); - var id = "tab_" + lang + "_" + counter; + var id = "tab_" + label.replace(/ /g, '_') + "_" + counter; $(this).attr("id", id); if (image != null && langImages[lang]) { - var buttonLabel = "<img src='" + langImages[lang] + "' alt='" + capitalizedLang + "' />"; + var buttonLabel = "<img src='" + langImages[lang] + "' alt='" + label + "' />"; } else if (notabs == null) { - var buttonLabel = "<b>" + capitalizedLang + "</b>"; + var buttonLabel = "<b>" + label + "</b>"; } else { var buttonLabel = "" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/examples/streaming/kafka/README.md ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/README.md b/examples/streaming/kafka/README.md index 587271c..1ae4036 100644 --- a/examples/streaming/kafka/README.md +++ b/examples/streaming/kafka/README.md @@ -81,7 +81,7 @@ Change directory into gearpump root, build gearpump with `sbt pack` and launch a Finally, let's run the KafkaWordCount example. ```bash - ./target/pack/bin/gear app -jar ./examples/target/$SCALA_VERSION_MAJOR/gearpump-examples-assembly-$VERSION.jar org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount + bin/gear app -jar examples/kafka-$VERSION-assembly.jar org.apache.gearpump.streaming.examples.kafka.wordcount.KafkaWordCount ``` One more step is to verify that we've succeeded in producing data to Kafka. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala new file mode 100644 index 0000000..49d3619 --- /dev/null +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.kafka.dsl + +import java.util.Properties + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser} +import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.kafka.KafkaStoreFactory +import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL +import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._ +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.util.AkkaApp + +object KafkaReadWrite extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array( + "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = false, + defaultValue = Some(1)), + "sink" -> CLIOption[Int]("<hom many kafka processor tasks>", required = false, + defaultValue = Some(1)), + "zookeeperConnect" -> CLIOption[String]("<zookeeper connect string>", required = false, + defaultValue = Some("localhost:2181")), + "brokerList" -> CLIOption[String]("<broker server list string>", required = false, + defaultValue = Some("localhost:9092")), + "sourceTopic" -> CLIOption[String]("<kafka source topic>", required = false, + defaultValue = Some("topic1")), + "sinkTopic" -> CLIOption[String]("<kafka sink topic>", required = false, + defaultValue = Some("topic2")), + "atLeastOnce" -> CLIOption[Boolean]("<turn on at least once source>", required = false, + defaultValue = Some(true)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + val sourceNum = config.getInt("source") + val sinkNum = config.getInt("sink") + val zookeeperConnect = config.getString("zookeeperConnect") + val brokerList = config.getString("brokerList") + val sourceTopic = config.getString("sourceTopic") + val sinkTopic = config.getString("sinkTopic") + val atLeastOnce = config.getBoolean("atLeastOnce") + val props = new Properties + val appName = "KafkaDSL" + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + + val context = ClientContext(akkaConf) + val app = StreamApp(appName, context) + + if (atLeastOnce) { + val checkpointStoreFactory = new KafkaStoreFactory(props) + KafkaDSL.createAtLeastOnceStream(app, sourceTopic, checkpointStoreFactory, props, sourceNum) + .writeToKafka(sinkTopic, props, sinkNum) + } else { + KafkaDSL.createAtMostOnceStream(app, sourceTopic, props, sourceNum) + .writeToKafka(sinkTopic, props, sinkNum) + } + + context.submit(app) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/README.md ---------------------------------------------------------------------- diff --git a/external/kafka/README.md b/external/kafka/README.md index 5adca88..e3e09a0 100644 --- a/external/kafka/README.md +++ b/external/kafka/README.md @@ -1,3 +1,3 @@ Kafka Source and Sink. -Check example at: https://github.com/intel-hadoop/gearpump/tree/master/examples/streaming/kafka +Check example at: https://github.com/apache/incubator-gearpump/tree/master/examples/streaming/kafka http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java index 403f213..0d5bec7 100644 --- a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java +++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java @@ -23,10 +23,8 @@ import kafka.common.TopicAndPartition; import kafka.consumer.ConsumerConfig; import org.apache.gearpump.streaming.kafka.lib.source.DefaultMessageDecoder; import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient; -import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread; import org.apache.gearpump.streaming.kafka.lib.source.grouper.DefaultPartitionGrouper; import org.apache.gearpump.streaming.source.DefaultTimeStampFilter; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -64,7 +62,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable { public static final String GROUP_ID_CONFIG = "group.id"; public static final String GROUP_ID_DOC = - "a string that uniquely identifies a set of consumers within the same consumer group"; + "A string that uniquely identifies a set of consumers within the same consumer group"; public static final String ENABLE_AUTO_COMMIT_CONFIG = "auto.commit.enable"; public static final String ENABLE_AUTO_COMMIT_DOC = @@ -72,11 +70,11 @@ public class KafkaConfig extends AbstractConfig implements Serializable { /** KafkaSource specific configs */ public static final String CONSUMER_START_OFFSET_CONFIG = "consumer.start.offset"; - private static final String CONSUMER_START_OFFSET_DOC = "kafka offset to start consume from. " + private static final String CONSUMER_START_OFFSET_DOC = "Kafka offset to start consume from. " + "This will be overwritten when checkpoint recover takes effect."; public static final String FETCH_THRESHOLD_CONFIG = "fetch.threshold"; - private static final String FETCH_THRESHOLD_DOC = "kafka messages are fetched asynchronously " + private static final String FETCH_THRESHOLD_DOC = "Kafka messages are fetched asynchronously " + "and put onto a internal queue. When the number of messages in the queue hit the threshold," + "the fetch thread stops fetching, and goes to sleep. It starts fetching again when the" + "number falls below the threshold"; @@ -102,7 +100,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable { "The replication factor for checkpoint store topic."; public static final String CHECKPOINT_STORE_NAME_PREFIX_CONFIG = "checkpoint.store.name.prefix"; - public static final String CHECKPOINT_STORE_NAME_PREFIX_DOC = "name prefix for checkpoint " + public static final String CHECKPOINT_STORE_NAME_PREFIX_DOC = "Name prefix for checkpoint " + "store whose name will be of the form, namePrefix-sourceTopic-partitionId"; static { @@ -184,12 +182,6 @@ public class KafkaConfig extends AbstractConfig implements Serializable { return tp.topic() + "-" + tp.partition(); } - public ConsumerConfig getConsumerConfig() { - Properties props = getBaseConsumerConfigs(); - - return new ConsumerConfig(props); - } - public Properties getProducerConfig() { Properties props = new Properties(); props.putAll(this.originals()); @@ -211,12 +203,8 @@ public class KafkaConfig extends AbstractConfig implements Serializable { return KafkaClient.factory(); } - public FetchThread.FetchThreadFactory getFetchThreadFactory() { - return FetchThread.factory(); - } - - private Properties getBaseConsumerConfigs() { + public ConsumerConfig getConsumerConfig() { Properties props = new Properties(); props.putAll(this.originals()); @@ -232,7 +220,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable { } props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); - return props; + return new ConsumerConfig(props); } private void removeSourceSpecificConfigs(Properties props) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala new file mode 100644 index 0000000..f1bb26a --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka.dsl + +import java.util.Properties + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl +import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory + +object KafkaDSL { + + /** + * Creates stream from Kafka where Kafka offsets are not checkpointed + * @param app stream application + * @param topics Kafka source topics + * @param properties Kafka configurations + * @param parallelism number of source tasks + * @param config task configurations + * @param description descriptions to mark source on dashboard + * @return a stream reading data from Kafka + */ + def createAtMostOnceStream[T]( + app: StreamApp, + topics: String, + properties: Properties, + parallelism: Int = 1, + config: UserConfig = UserConfig.empty, + description: String = "KafkaSource" + ): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, properties), parallelism, config, description) + } + + /** + * Creates stream from Kafka where Kafka offsets are checkpointed with timestamp + * @param app stream application + * @param topics Kafka source topics + * @param checkpointStoreFactory factory to build checkpoint store + * @param properties Kafka configurations + * @param parallelism number of source tasks + * @param config task configurations + * @param description descriptions to mark source on dashboard + * @return a stream reading data from Kafka + */ + def createAtLeastOnceStream[T]( + app: StreamApp, + topics: String, + checkpointStoreFactory: CheckpointStoreFactory, + properties: Properties, + parallelism: Int = 1, + config: UserConfig = UserConfig.empty, + description: String = "KafkaSource"): dsl.Stream[T] = { + val source = new KafkaSource(topics, properties) + source.setCheckpointStore(checkpointStoreFactory) + app.source[T](source, parallelism, config, description) + } + + import scala.language.implicitConversions + implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = { + new KafkaDSL[T](stream) + } +} + +class KafkaDSL[T](stream: dsl.Stream[T]) { + + /** + * Sinks data to Kafka + * @param topic Kafka sink topic + * @param properties Kafka configurations + * @param parallelism number of sink tasks + * @param userConfig task configurations + * @param description descriptions to mark sink on dashboard + * @return a stream writing data to Kafka + */ + def writeToKafka( + topic: String, + properties: Properties, + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "KafkaSink"): dsl.Stream[T] = { + stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala deleted file mode 100644 index b34149f..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.kafka.dsl - -import java.util.Properties - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl -import org.apache.gearpump.streaming.kafka.KafkaSink - -class KafkaDSLSink[T](stream: dsl.Stream[T]) { - - /** Create a Kafka DSL Sink */ - def writeToKafka( - topic: String, - properties: Properties, - parallelism: Int = 1, - userConfig: UserConfig = UserConfig.empty, - description: String = null): dsl.Stream[T] = { - stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description) - } -} - -object KafkaDSLSink { - - import scala.language.implicitConversions - - implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = { - new KafkaDSLSink[T](stream) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/56ac2864/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala deleted file mode 100644 index 874d691..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.kafka.dsl - -import java.util.Properties - -import org.apache.gearpump.streaming.dsl -import org.apache.gearpump.streaming.dsl.StreamApp -import org.apache.gearpump.streaming.kafka.KafkaSource - -object KafkaDSLUtil { - - def createStream[T]( - app: StreamApp, - topics: String, - parallelism: Int, - description: String, - properties: Properties): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, properties), parallelism, description) - } -} -
