fix GEARPUMP-122, refactor kafka connector API This PR mainly refactor the Kafka connector API with the following changes:
1. removes `OffsetStorage` and `KafkaStorage`, and uses `CheckpointStore` and `KafkaStore` for both offset and state checkpoint stores. 2. adds a `checkpoint(CheckpointStoreFactory)` method to `TimeReplayableSource` which makes offset store configurable by user. If `checkpoint` is not called, then no offset checkpoint will be performed and applications run in *at-most-once* mode. 3. moves user facing APIs, `KafkaSource`, `KafkaSink` and `KafkaStore` to Java and keeps their implementations in Scala, which prevents Scala package private methods being exposed in Java programs. 4. `KafkaSource`, `KafkaSink` and `KafkaStore` are all configurable through Java properties which is the standard Kafka way. 5. update corresponding UTs. Author: manuzhang <[email protected]> Closes #25 from manuzhang/kafka. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/04c3975d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/04c3975d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/04c3975d Branch: refs/heads/master Commit: 04c3975d6bdcaa083887ceb633b484e01d041bea Parents: 32f1507 Author: manuzhang <[email protected]> Authored: Wed Jun 1 16:53:58 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Jun 1 16:53:58 2016 +0800 ---------------------------------------------------------------------- .../examples/kafka/KafkaReadWrite.scala | 19 +- .../kafka/wordcount/KafkaWordCount.scala | 26 +- .../examples/state/MessageCountApp.scala | 20 +- .../examples/wordcountjava/WordCount.java | 1 + .../hadoop/HadoopCheckpointStoreFactory.scala | 14 +- .../HadoopCheckpointStoreIntegrationSpec.scala | 5 +- .../gearpump/streaming/kafka/KafkaSink.java | 41 +++ .../gearpump/streaming/kafka/KafkaSource.java | 54 ++++ .../streaming/kafka/KafkaStoreFactory.java | 40 +++ .../streaming/kafka/util/KafkaConfig.java | 264 ++++++++++++++++++ .../gearpump/streaming/kafka/KafkaSink.scala | 80 ------ .../gearpump/streaming/kafka/KafkaSource.scala | 195 ------------- .../gearpump/streaming/kafka/KafkaStorage.scala | 148 ---------- .../streaming/kafka/dsl/KafkaDSLSink.scala | 15 +- .../streaming/kafka/dsl/KafkaDSLUtil.scala | 55 +--- .../kafka/lib/DefaultMessageDecoder.scala | 41 --- .../kafka/lib/KafkaOffsetManager.scala | 66 ----- .../streaming/kafka/lib/KafkaSourceConfig.scala | 178 ------------ .../streaming/kafka/lib/KafkaUtil.scala | 167 ------------ .../consumer/ExponentialBackoffSleeper.scala | 55 ---- .../kafka/lib/consumer/FetchThread.scala | 139 ---------- .../kafka/lib/consumer/KafkaConsumer.scala | 103 ------- .../kafka/lib/consumer/KafkaMessage.scala | 38 --- .../kafka/lib/grouper/KafkaDefaultGrouper.scala | 38 --- .../kafka/lib/grouper/KafkaGrouper.scala | 30 -- .../kafka/lib/sink/AbstractKafkaSink.scala | 92 +++++++ .../kafka/lib/source/AbstractKafkaSource.scala | 173 ++++++++++++ .../lib/source/DefaultMessageDecoder.scala | 38 +++ .../consumer/ExponentialBackoffSleeper.scala | 59 ++++ .../kafka/lib/source/consumer/FetchThread.scala | 164 +++++++++++ .../lib/source/consumer/KafkaConsumer.scala | 94 +++++++ .../lib/source/consumer/KafkaMessage.scala | 38 +++ .../grouper/DefaultPartitionGrouper.scala | 38 +++ .../lib/source/grouper/PartitionGrouper.scala | 30 ++ .../streaming/kafka/lib/store/KafkaStore.scala | 127 +++++++++ .../streaming/kafka/lib/util/KafkaClient.scala | 121 +++++++++ .../streaming/kafka/KafkaSinkSpec.scala | 25 +- .../streaming/kafka/KafkaSourceSpec.scala | 272 +++++++++++-------- .../streaming/kafka/KafkaStoreSpec.scala | 169 ++++++++++++ .../kafka/lib/DefaultMessageDecoderSpec.scala | 44 --- .../kafka/lib/KafkaOffsetManagerSpec.scala | 117 -------- .../streaming/kafka/lib/KafkaStorageSpec.scala | 187 ------------- .../streaming/kafka/lib/KafkaUtilSpec.scala | 107 -------- .../ExponentialBackoffSleeperSpec.scala | 68 ----- .../kafka/lib/consumer/FetchThreadSpec.scala | 113 -------- .../kafka/lib/consumer/KafkaConsumerSpec.scala | 88 ------ .../lib/grouper/KafkaDefaultGrouperSpec.scala | 42 --- .../lib/source/DefaultMessageDecoderSpec.scala | 52 ++++ .../ExponentialBackoffSleeperSpec.scala | 68 +++++ .../lib/source/consumer/FetchThreadSpec.scala | 159 +++++++++++ .../lib/source/consumer/KafkaConsumerSpec.scala | 88 ++++++ .../grouper/DefaultPartitionGrouperSpec.scala | 42 +++ .../kafka/lib/util/KafkaClientSpec.scala | 139 ++++++++++ .../kafka/util/KafkaServerHarness.scala | 19 +- .../streaming/kafka/util/ZookeeperHarness.scala | 8 +- .../streaming/state/api/PersistentTask.scala | 3 +- .../state/impl/InMemoryCheckpointStore.scala | 2 +- .../transaction/api/CheckpointStore.scala | 9 +- .../transaction/api/MessageDecoder.scala | 11 +- .../transaction/api/OffsetManager.scala | 44 --- .../transaction/api/OffsetStorage.scala | 66 ----- .../transaction/api/TimeReplayableSource.scala | 8 +- 62 files changed, 2373 insertions(+), 2383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala index 364544b..cfeef5b 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/KafkaReadWrite.scala @@ -18,7 +18,10 @@ package org.apache.gearpump.streaming.examples.kafka +import java.util.Properties + import akka.actor.ActorSystem +import org.apache.gearpump.streaming.kafka.util.KafkaConfig import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig @@ -26,7 +29,7 @@ import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import org.apache.gearpump.partitioner.ShufflePartitioner import org.apache.gearpump.streaming.StreamApplication -import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import org.apache.gearpump.streaming.kafka._ import org.apache.gearpump.streaming.sink.DataSinkProcessor import org.apache.gearpump.streaming.source.DataSourceProcessor import org.apache.gearpump.util.Graph._ @@ -52,6 +55,7 @@ object KafkaReadWrite extends AkkaApp with ArgumentsParser { def application(config: ParseResult, system: ActorSystem): StreamApplication = { implicit val actorSystem = system + val appName = "KafkaReadWrite" val sourceNum = config.getInt("source") val sinkNum = config.getInt("sink") val zookeeperConnect = config.getString("zookeeperConnect") @@ -60,14 +64,19 @@ object KafkaReadWrite extends AkkaApp with ArgumentsParser { val sinkTopic = config.getString("sinkTopic") val appConfig = UserConfig.empty - val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) - val source = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory) + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + val source = new KafkaSource(sourceTopic, props) + val checkpointStoreFactory = new KafkaStoreFactory(props) + source.setCheckpointStore(checkpointStoreFactory) val sourceProcessor = DataSourceProcessor(source, sourceNum) - val sink = new KafkaSink(sinkTopic, brokerList) + val sink = new KafkaSink(sinkTopic, props) val sinkProcessor = DataSinkProcessor(sink, sinkNum) val partitioner = new ShufflePartitioner val computation = sourceProcessor ~ partitioner ~> sinkProcessor - val app = StreamApplication("KafkaReadWrite", Graph(computation), appConfig) + val app = StreamApplication(appName, Graph(computation), appConfig) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala index 5ef1e67..aa9842f 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala @@ -18,16 +18,18 @@ package org.apache.gearpump.streaming.examples.kafka.wordcount +import java.util.Properties + import akka.actor.ActorSystem import kafka.api.OffsetRequest +import org.apache.gearpump.streaming.kafka.util.KafkaConfig import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} import org.apache.gearpump.partitioner.HashPartitioner -import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig -import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import org.apache.gearpump.streaming.kafka._ import org.apache.gearpump.streaming.sink.DataSinkProcessor import org.apache.gearpump.streaming.source.DataSourceProcessor import org.apache.gearpump.streaming.{Processor, StreamApplication} @@ -48,25 +50,31 @@ object KafkaWordCount extends AkkaApp with ArgumentsParser { def application(config: ParseResult, system: ActorSystem): StreamApplication = { implicit val actorSystem = system + val appName = "KafkaWordCount" val sourceNum = config.getInt("source") val splitNum = config.getInt("split") val sumNum = config.getInt("sum") val sinkNum = config.getInt("sink") - val appConfig = UserConfig.empty - val offsetStorageFactory = new KafkaStorageFactory("localhost:2181", "localhost:9092") - val kafkaSourceConfig = new KafkaSourceConfig() - .withConsumerTopics("topic1").withConsumerStartOffset(OffsetRequest.LatestTime) - val source = new KafkaSource(kafkaSourceConfig, offsetStorageFactory) + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181") + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + props.put(KafkaConfig.CONSUMER_START_OFFSET_CONFIG, + new java.lang.Long(OffsetRequest.LatestTime)) + props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + val sourceTopic = "topic1" + val source = new KafkaSource(sourceTopic, props) + val checkpointStoreFactory = new KafkaStoreFactory(props) + source.setCheckpointStore(checkpointStoreFactory) val sourceProcessor = DataSourceProcessor(source, sourceNum) val split = Processor[Split](splitNum) val sum = Processor[Sum](sumNum) - val sink = new KafkaSink("topic2", "localhost:9092") + val sink = new KafkaSink("topic2", props) val sinkProcessor = DataSinkProcessor(sink, sinkNum) val partitioner = new HashPartitioner val computation = sourceProcessor ~ partitioner ~> split ~ partitioner ~> sum ~ partitioner ~> sinkProcessor - val app = StreamApplication("KafkaWordCount", Graph(computation), appConfig) + val app = StreamApplication(appName, Graph(computation), appConfig) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala index 13bef0d..5a3954a 100644 --- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala @@ -18,7 +18,10 @@ package org.apache.gearpump.streaming.examples.state +import java.util.Properties + import akka.actor.ActorSystem +import org.apache.gearpump.streaming.kafka.util.KafkaConfig import org.apache.hadoop.conf.Configuration import org.apache.gearpump.cluster.UserConfig @@ -28,7 +31,7 @@ import org.apache.gearpump.partitioner.HashPartitioner import org.apache.gearpump.streaming.examples.state.processor.CountProcessor import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation -import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource, KafkaStorageFactory} +import org.apache.gearpump.streaming.kafka.{KafkaStoreFactory, KafkaSink, KafkaSource} import org.apache.gearpump.streaming.sink.DataSinkProcessor import org.apache.gearpump.streaming.source.DataSourceProcessor import org.apache.gearpump.streaming.state.impl.PersistentStateConfig @@ -63,6 +66,7 @@ object MessageCountApp extends AkkaApp with ArgumentsParser { ) def application(config: ParseResult)(implicit system: ActorSystem): StreamApplication = { + val appName = "MessageCount" val hadoopConfig = new Configuration hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS)) val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageCount", hadoopConfig, @@ -73,19 +77,23 @@ object MessageCountApp extends AkkaApp with ArgumentsParser { .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, checkpointStoreFactory) - val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT) + val properties = new Properties + properties.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, config.getString(ZOOKEEPER_CONNECT)) val brokerList = config.getString(BROKER_LIST) - val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, brokerList) + properties.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + properties.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName) + val kafkaStoreFactory = new KafkaStoreFactory(properties) val sourceTopic = config.getString(SOURCE_TOPIC) - val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, offsetStorageFactory) + val kafkaSource = new KafkaSource(sourceTopic, properties) + kafkaSource.setCheckpointStore(kafkaStoreFactory) val sourceProcessor = DataSourceProcessor(kafkaSource, config.getInt(SOURCE_TASK)) val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), taskConf = taskConfig) - val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList) + val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), properties) val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK)) val partitioner = new HashPartitioner() val graph = Graph(sourceProcessor ~ partitioner ~> countProcessor ~ partitioner ~> sinkProcessor) - val app = StreamApplication("MessageCount", graph, UserConfig.empty) + val app = StreamApplication(appName, graph, UserConfig.empty) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java index ee44536..6b5bba0 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/WordCount.java @@ -28,6 +28,7 @@ import org.apache.gearpump.partitioner.Partitioner; import org.apache.gearpump.streaming.javaapi.Graph; import org.apache.gearpump.streaming.javaapi.Processor; import org.apache.gearpump.streaming.javaapi.StreamApplication; +import org.apache.gearpump.util.Constants; /** Java version of WordCount with Processor Graph API */ public class WordCount { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala index e5e0f13..acc2438 100644 --- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala +++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala @@ -40,21 +40,27 @@ class HadoopCheckpointStoreFactory( extends CheckpointStoreFactory { import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._ + /** + * Overrides Java's default serialization + * Please do not remove this + */ private def writeObject(out: ObjectOutputStream): Unit = { out.defaultWriteObject() hadoopConfig.write(out) } + /** + * Overrides Java's default deserialization + * Please do not remove this + */ private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() hadoopConfig = new Configuration(false) hadoopConfig.readFields(in) } - override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = { - import taskContext.{appId, taskId} - val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", - s"app$appId-task${taskId.processorId}_${taskId.index}") + override def getCheckpointStore(name: String): CheckpointStore = { + val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", name) val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig) new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala ---------------------------------------------------------------------- diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala index 4fd8dc1..76145e8 100644 --- a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala +++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala @@ -48,7 +48,8 @@ class HadoopCheckpointStoreIntegrationSpec val rootDirName = "test" val rootDir = new Path(rootDirName + Path.SEPARATOR + s"v${HadoopCheckpointStoreFactory.VERSION}") - val subDir = new Path(rootDir, "app0-task0_0") + val subDirName = "app0-task0_0" + val subDir = new Path(rootDir, subDirName) val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig) fs.delete(rootDir, true) @@ -56,7 +57,7 @@ class HadoopCheckpointStoreIntegrationSpec val checkpointStoreFactory = new HadoopCheckpointStoreFactory( rootDirName, hadoopConfig, new FileSizeRotation(fileSize)) - val checkpointStore = checkpointStoreFactory.getCheckpointStore(userConfig, taskContext) + val checkpointStore = checkpointStoreFactory.getCheckpointStore(subDirName) checkpointStore.persist(0L, Array(0.toByte)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java new file mode 100644 index 0000000..f8d85f7 --- /dev/null +++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSink.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka; + +import org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink; +import org.apache.gearpump.streaming.kafka.util.KafkaConfig; +import org.apache.gearpump.streaming.sink.DataSink; + +import java.util.Properties; + +/** + * USER API for kafka sink connector. + * Please refer to {@link AbstractKafkaSink} for detailed descriptions and implementations. + */ +public class KafkaSink extends AbstractKafkaSink implements DataSink { + + public KafkaSink(String topic, Properties props) { + super(topic, props); + } + + KafkaSink(String topic, Properties props, + KafkaConfig.KafkaConfigFactory kafkaConfigFactory, + KafkaProducerFactory factory) { + super(topic, props, kafkaConfigFactory, factory); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java new file mode 100644 index 0000000..fdc279b --- /dev/null +++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaSource.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka; + +import kafka.common.TopicAndPartition; +import org.apache.gearpump.streaming.kafka.lib.source.AbstractKafkaSource; +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient; +import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread; +import org.apache.gearpump.streaming.kafka.util.KafkaConfig; +import org.apache.gearpump.streaming.transaction.api.CheckpointStore; +import org.apache.gearpump.streaming.transaction.api.TimeReplayableSource; + +import java.util.Properties; + +/** + * USER API for kafka source connector. + * Please refer to {@link AbstractKafkaSource} for detailed descriptions and implementations. + */ +public class KafkaSource extends AbstractKafkaSource implements TimeReplayableSource { + + public KafkaSource(String topic, Properties properties) { + super(topic, properties); + } + + // constructor for tests + KafkaSource(String topic, Properties properties, + KafkaConfig.KafkaConfigFactory configFactory, + KafkaClient.KafkaClientFactory clientFactory, + FetchThread.FetchThreadFactory threadFactory) { + super(topic, properties, configFactory, clientFactory, threadFactory); + } + + /** + * for tests only + */ + protected void addPartitionAndStore(TopicAndPartition tp, CheckpointStore store) { + addCheckpointStore(tp, store); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java new file mode 100644 index 0000000..2521e70 --- /dev/null +++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/KafkaStoreFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka; + +import org.apache.gearpump.streaming.kafka.lib.store.AbstractKafkaStoreFactory; +import org.apache.gearpump.streaming.kafka.util.KafkaConfig; +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory; + +import java.util.Properties; + +/** + * USER API for kafka store factory. + * Please refer to {@link AbstractKafkaStoreFactory} for detailed descriptions and implementations. + */ +public class KafkaStoreFactory extends AbstractKafkaStoreFactory implements CheckpointStoreFactory { + + public KafkaStoreFactory(Properties props) { + super(props); + } + + /** constructor for tests */ + KafkaStoreFactory(Properties props, KafkaConfig.KafkaConfigFactory factory) { + super(props, factory); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java new file mode 100644 index 0000000..403f213 --- /dev/null +++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.util; + +import kafka.api.OffsetRequest; +import kafka.common.TopicAndPartition; +import kafka.consumer.ConsumerConfig; +import org.apache.gearpump.streaming.kafka.lib.source.DefaultMessageDecoder; +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient; +import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread; +import org.apache.gearpump.streaming.kafka.lib.source.grouper.DefaultPartitionGrouper; +import org.apache.gearpump.streaming.source.DefaultTimeStampFilter; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.io.Serializable; +import java.util.Properties; + + +/** + * kafka specific configs + */ +public class KafkaConfig extends AbstractConfig implements Serializable { + + private static final ConfigDef CONFIG; + + public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; + private static final String ZOOKEEPER_CONNECT_DOC = + "Zookeeper connect string for Kafka topics management."; + + public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers"; + public static final String BOOTSTRAP_SERVERS_DOC = "A list of host/port pairs to use for " + + "establishing the initial connection to the Kafka cluster. " + + "The client will make use of all servers irrespective of which servers are specified " + + "here for bootstrapping—this list only impacts the initial hosts used to discover " + + "the full set of servers. This list should be in the form " + + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the " + + "initial connection to discover the full cluster membership (which may change dynamically)," + + " this list need not contain the full set of servers (you may want more than one, though, " + + "in case a server is down)."; + + public static final String CLIENT_ID_CONFIG = "client.id"; + public static final String CLIENT_ID_DOC = "An id string to pass to the server when making " + + "requests. The purpose of this is to be able to track the source of requests beyond just " + + "ip/port by allowing a logical application name to be included in server-side request " + + "logging."; + + public static final String GROUP_ID_CONFIG = "group.id"; + public static final String GROUP_ID_DOC = + "a string that uniquely identifies a set of consumers within the same consumer group"; + + public static final String ENABLE_AUTO_COMMIT_CONFIG = "auto.commit.enable"; + public static final String ENABLE_AUTO_COMMIT_DOC = + "If true the consumer's offset will be periodically committed in the background."; + + /** KafkaSource specific configs */ + public static final String CONSUMER_START_OFFSET_CONFIG = "consumer.start.offset"; + private static final String CONSUMER_START_OFFSET_DOC = "kafka offset to start consume from. " + + "This will be overwritten when checkpoint recover takes effect."; + + public static final String FETCH_THRESHOLD_CONFIG = "fetch.threshold"; + private static final String FETCH_THRESHOLD_DOC = "kafka messages are fetched asynchronously " + + "and put onto a internal queue. When the number of messages in the queue hit the threshold," + + "the fetch thread stops fetching, and goes to sleep. It starts fetching again when the" + + "number falls below the threshold"; + + public static final String FETCH_SLEEP_MS_CONFIG = "fetch.sleep.ms"; + private static final String FETCH_SLEEP_MS_DOC = + "The amount of time to sleep when hitting fetch.threshold."; + + public static final String MESSAGE_DECODER_CLASS_CONFIG = "message.decoder.class"; + private static final String MESSAGE_DECODER_CLASS_DOC = + "Message decoder class that implements the <code>MessageDecoder</code> interface."; + + public static final String TIMESTAMP_FILTER_CLASS_CONFIG = "timestamp.filter.class"; + private static final String TIMESTAMP_FILTER_CLASS_DOC = + "Timestamp filter class that implements the <code>TimeStampFilter</code> interface"; + + public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; + private static final String PARTITION_GROUPER_CLASS_DOC = + "Partition grouper class that implements the <code>KafkaGrouper</code> interface."; + + public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; + public static final String REPLICATION_FACTOR_DOC = + "The replication factor for checkpoint store topic."; + + public static final String CHECKPOINT_STORE_NAME_PREFIX_CONFIG = "checkpoint.store.name.prefix"; + public static final String CHECKPOINT_STORE_NAME_PREFIX_DOC = "name prefix for checkpoint " + + "store whose name will be of the form, namePrefix-sourceTopic-partitionId"; + + static { + CONFIG = new ConfigDef() + .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value + ConfigDef.Type.LIST, + ConfigDef.Importance.HIGH, + BOOTSTRAP_SERVERS_DOC) + .define(CLIENT_ID_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + CLIENT_ID_DOC) + .define(GROUP_ID_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + GROUP_ID_DOC) + .define(ZOOKEEPER_CONNECT_CONFIG, // required with no default value + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + ZOOKEEPER_CONNECT_DOC) + .define(REPLICATION_FACTOR_CONFIG, + ConfigDef.Type.INT, + 1, + ConfigDef.Range.atLeast(1), + ConfigDef.Importance.MEDIUM, + REPLICATION_FACTOR_DOC) + .define(MESSAGE_DECODER_CLASS_CONFIG, + ConfigDef.Type.CLASS, + DefaultMessageDecoder.class.getName(), + ConfigDef.Importance.MEDIUM, + MESSAGE_DECODER_CLASS_DOC) + .define(TIMESTAMP_FILTER_CLASS_CONFIG, + ConfigDef.Type.CLASS, + DefaultTimeStampFilter.class.getName(), + ConfigDef.Importance.MEDIUM, + TIMESTAMP_FILTER_CLASS_DOC) + .define(PARTITION_GROUPER_CLASS_CONFIG, + ConfigDef.Type.CLASS, + DefaultPartitionGrouper.class.getName(), + ConfigDef.Importance.MEDIUM, + PARTITION_GROUPER_CLASS_DOC) + .define(FETCH_THRESHOLD_CONFIG, + ConfigDef.Type.INT, + 10000, + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.LOW, + FETCH_THRESHOLD_DOC) + .define(FETCH_SLEEP_MS_CONFIG, + ConfigDef.Type.INT, + 100, + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.LOW, + FETCH_SLEEP_MS_DOC) + .define(CONSUMER_START_OFFSET_CONFIG, + ConfigDef.Type.LONG, + OffsetRequest.EarliestTime(), + ConfigDef.Range.atLeast(-2), + ConfigDef.Importance.MEDIUM, + CONSUMER_START_OFFSET_DOC) + .define(ENABLE_AUTO_COMMIT_CONFIG, + ConfigDef.Type.BOOLEAN, + true, + ConfigDef.Importance.MEDIUM, + ENABLE_AUTO_COMMIT_DOC) + .define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + CHECKPOINT_STORE_NAME_PREFIX_DOC); + } + + public KafkaConfig(Properties props) { + super(CONFIG, props); + } + + public static String getCheckpointStoreNameSuffix(TopicAndPartition tp) { + return tp.topic() + "-" + tp.partition(); + } + + public ConsumerConfig getConsumerConfig() { + Properties props = getBaseConsumerConfigs(); + + return new ConsumerConfig(props); + } + + public Properties getProducerConfig() { + Properties props = new Properties(); + props.putAll(this.originals()); + + // remove source properties + removeSourceSpecificConfigs(props); + + // remove consumer properties + removeConsumerSpecificConfigs(props); + + return props; + } + + public String getKafkaStoreTopic(String suffix) { + return getString(CHECKPOINT_STORE_NAME_PREFIX_CONFIG) + "-" + suffix; + } + + public KafkaClient.KafkaClientFactory getKafkaClientFactory() { + return KafkaClient.factory(); + } + + public FetchThread.FetchThreadFactory getFetchThreadFactory() { + return FetchThread.factory(); + } + + + private Properties getBaseConsumerConfigs() { + Properties props = new Properties(); + props.putAll(this.originals()); + + // remove source properties + removeSourceSpecificConfigs(props); + + // remove producer properties + removeProducerSpecificConfigs(props); + + // set consumer default property values + if (!props.containsKey(GROUP_ID_CONFIG)) { + props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG)); + } + props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); + + return props; + } + + private void removeSourceSpecificConfigs(Properties props) { + props.remove(FETCH_SLEEP_MS_CONFIG); + props.remove(FETCH_THRESHOLD_CONFIG); + props.remove(PARTITION_GROUPER_CLASS_CONFIG); + props.remove(MESSAGE_DECODER_CLASS_CONFIG); + props.remove(TIMESTAMP_FILTER_CLASS_CONFIG); + props.remove(REPLICATION_FACTOR_CONFIG); + props.remove(CHECKPOINT_STORE_NAME_PREFIX_CONFIG); + } + + private void removeConsumerSpecificConfigs(Properties props) { + props.remove(ZOOKEEPER_CONNECT_CONFIG); + props.remove(GROUP_ID_CONFIG); + } + + private void removeProducerSpecificConfigs(Properties props) { + props.remove(BOOTSTRAP_SERVERS_CONFIG); + } + + + public static class KafkaConfigFactory implements Serializable { + public KafkaConfig getKafkaConfig(Properties props) { + return new KafkaConfig(props); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala deleted file mode 100644 index cb90f93..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSink.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka - -import java.util.Properties - -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.serialization.ByteArraySerializer - -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.kafka.lib.KafkaUtil -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.task.TaskContext - -/** - * kafka sink connectors that invokes org.apache.kafka.clients.producer.KafkaProducer to send - * messages to kafka queue - * @param getProducer is a function to construct a KafkaProducer - * @param topic is the kafka topic to write to - */ -class KafkaSink private[kafka]( - getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) extends DataSink { - - /** - * @param topic producer topic - * @param properties producer config - */ - def this(topic: String, properties: Properties) = { - this(() => KafkaUtil.createKafkaProducer(properties, - new ByteArraySerializer, new ByteArraySerializer), topic) - } - - /** - * - * creates an empty properties with `bootstrap.servers` set to `bootstrapServers` - * and invokes `KafkaSink(topic, properties)` - * @param topic producer topic - * @param bootstrapServers kafka producer config `bootstrap.servers` - */ - def this(topic: String, bootstrapServers: String) = { - this(topic, KafkaUtil.buildProducerConfig(bootstrapServers)) - } - - // Lazily construct producer since KafkaProducer is not serializable - private lazy val producer = getProducer() - - override def open(context: TaskContext): Unit = {} - - override def write(message: Message): Unit = { - val record = message.msg match { - case (k, v) => - new ProducerRecord[Array[Byte], Array[Byte]](topic, k.asInstanceOf[Array[Byte]], - v.asInstanceOf[Array[Byte]]) - case v => - new ProducerRecord[Array[Byte], Array[Byte]](topic, v.asInstanceOf[Array[Byte]]) - } - producer.send(record) - } - - override def close(): Unit = { - producer.close() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala deleted file mode 100644 index 339711b..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka - -import java.util.Properties -import scala.collection.mutable.ArrayBuffer -import scala.util.{Failure, Success} - -import kafka.common.TopicAndPartition -import org.slf4j.Logger - -import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} -import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil} -import org.apache.gearpump.streaming.source.DefaultTimeStampFilter -import org.apache.gearpump.streaming.task.TaskContext -import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty -import org.apache.gearpump.streaming.transaction.api._ -import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} - -object KafkaSource { - private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource]) -} - -/** - * Kafka source connectors that pulls a batch of messages (`kafka.consumer.emit.batch.size`) - * from multiple Kafka TopicAndPartition in a round-robin way. - * - * This is a TimeReplayableSource which is able to replay messages given a start time. - * Each kafka message is tagged with a timestamp by - * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) - * mapping is stored to a [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]. - * On recovery, we could retrieve the previously stored offset from the - * [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read - * from there. - * - * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and further filtered by a - * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]] - * such that obsolete messages are dropped. - * - * @param config kafka source config - * @param offsetStorageFactory factory to build [[OffsetStorage]] - * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes - * @param timestampFilter filters out message based on timestamp - * @param fetchThread fetches messages and puts on a in-memory queue - * @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition - */ -class KafkaSource( - config: KafkaSourceConfig, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder = new DefaultMessageDecoder, - timestampFilter: TimeStampFilter = new DefaultTimeStampFilter, - private var fetchThread: Option[FetchThread] = None, - private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = { - Map.empty[TopicAndPartition, KafkaOffsetManager] - }) extends TimeReplayableSource { - import org.apache.gearpump.streaming.kafka.KafkaSource._ - - private var startTime: Option[TimeStamp] = None - - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param properties kafka consumer config - * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory - * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] - * - */ - def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = { - this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory) - } - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param properties kafka consumer config - * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory - * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] - * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes - * @param timestampFilter filters out message based on timestamp - */ - def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = { - this(KafkaSourceConfig(properties) - .withConsumerTopics(topics), offsetStorageFactory, - messageDecoder, timestampFilter) - } - - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param zkConnect kafka consumer config `zookeeper.connect` - * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory - * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] - */ - def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) = - this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory) - - /** - * Constructs a Kafka Source by... - * - * @param topics comma-separated string of topics - * @param zkConnect kafka consumer config `zookeeper.connect` - * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory - * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] - * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes - * @param timestampFilter filters out message based on timestamp - */ - def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter) = { - this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory, - messageDecoder, timestampFilter) - } - - LOG.debug(s"assigned ${offsetManagers.keySet}") - - private[kafka] def setStartTime(startTime: Option[TimeStamp]): Unit = { - this.startTime = startTime - fetchThread.foreach { fetch => - this.startTime.foreach { time => - offsetManagers.foreach { case (tp, offsetManager) => - offsetManager.resolveOffset(time) match { - case Success(offset) => - LOG.debug(s"set start offset to $offset for $tp") - fetch.setStartOffset(tp, offset) - case Failure(StorageEmpty) => - LOG.debug(s"no previous TimeStamp stored") - case Failure(e) => throw e - } - } - } - fetch.setDaemon(true) - fetch.start() - } - } - - override def open(context: TaskContext, startTime: TimeStamp): Unit = { - import context.{appId, appName, parallelism, taskId} - - val topics = config.getConsumerTopics - val grouper = config.getGrouper - val consumerConfig = config.consumerConfig - val topicAndPartitions = grouper.group(parallelism, taskId.index, - KafkaUtil.getTopicAndPartitions(KafkaUtil.connectZookeeper(consumerConfig)(), topics)) - this.fetchThread = Some(FetchThread(topicAndPartitions, config.getFetchThreshold, - config.getFetchSleepMS, config.getConsumerStartOffset, consumerConfig)) - this.offsetManagers = topicAndPartitions.map { tp => - val storageTopic = s"app${appId}_${appName}_${tp.topic}_${tp.partition}" - val storage = offsetStorageFactory.getOffsetStorage(storageTopic) - tp -> new KafkaOffsetManager(storage) - }.toMap - - setStartTime(Option(startTime)) - } - - override def read(): Message = { - fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull - } - - private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = { - val msgOpt = offsetManagers(kafkaMsg.topicAndPartition) - .filter(messageDecoder.fromBytes(kafkaMsg.msg) -> kafkaMsg.offset) - msgOpt.flatMap { msg => - startTime match { - case None => - Some(msg) - case Some(time) => - timestampFilter.filter(msg, time) - } - } - } - - override def close(): Unit = { - offsetManagers.foreach(_._2.close()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala deleted file mode 100644 index 8748999..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka - -import java.util.Properties -import scala.collection.mutable -import scala.util.{Failure, Success, Try} - -import com.twitter.bijection.Injection -import kafka.api.OffsetRequest -import kafka.consumer.ConsumerConfig -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.slf4j.Logger - -import org.apache.gearpump.TimeStamp -import org.apache.gearpump.streaming.kafka.lib.KafkaUtil -import org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer -import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} -import org.apache.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory} -import org.apache.gearpump.util.LogUtil - -/** - * Factory that builds [[KafkaStorage]] - * - * @param consumerProps kafka consumer config - * @param producerProps kafka producer config - */ -class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties) - extends OffsetStorageFactory { - - /** - * Creates consumer config properties with `zookeeper.connect` set to zkConnect - * and producer config properties with `bootstrap.servers` set to bootstrapServers - * - * @param zkConnect kafka consumer config `zookeeper.connect` - * @param bootstrapServers kafka producer config `bootstrap.servers` - */ - def this(zkConnect: String, bootstrapServers: String) = - this(KafkaUtil.buildConsumerConfig(zkConnect), KafkaUtil.buildProducerConfig(bootstrapServers)) - - override def getOffsetStorage(dir: String): OffsetStorage = { - val topic = dir - val consumerConfig = new ConsumerConfig(consumerProps) - val getConsumer = () => KafkaConsumer(topic, 0, OffsetRequest.EarliestTime, consumerConfig) - new KafkaStorage(topic, KafkaUtil.createKafkaProducer[Array[Byte], Array[Byte]]( - producerProps, new ByteArraySerializer, new ByteArraySerializer), - getConsumer(), KafkaUtil.connectZookeeper(consumerConfig)()) - } -} - -object KafkaStorage { - private val LOG: Logger = LogUtil.getLogger(classOf[KafkaStorage]) -} - -/** - * Stores offset-timestamp mapping to kafka - * - * @param topic kafka store topic - * @param producer kafka producer - * @param getConsumer function to get kafka consumer - * @param connectZk function to connect zookeeper - */ -class KafkaStorage private[kafka]( - topic: String, - producer: KafkaProducer[Array[Byte], Array[Byte]], - getConsumer: => KafkaConsumer, - connectZk: => ZkClient) - extends OffsetStorage { - - private lazy val consumer = getConsumer - - private val dataByTime: List[(TimeStamp, Array[Byte])] = { - if (KafkaUtil.topicExists(connectZk, topic)) { - load(consumer) - } else { - List.empty[(TimeStamp, Array[Byte])] - } - } - - /** - * Offsets with timestamp less than `time` have already been processed by the system - * so we look up the storage for the first offset with timestamp large equal than `time` - * on replay. - * - * @param time the timestamp to look up for the earliest unprocessed offset - * @return the earliest unprocessed offset if `time` is in the range, otherwise failure - */ - override def lookUp(time: TimeStamp): Try[Array[Byte]] = { - if (dataByTime.isEmpty) { - Failure(StorageEmpty) - } else { - val min = dataByTime.head - val max = dataByTime.last - if (time < min._1) { - Failure(Underflow(min._2)) - } else if (time > max._1) { - Failure(Overflow(max._2)) - } else { - Success(dataByTime.find(_._1 >= time).get._2) - } - } - } - - override def append(time: TimeStamp, offset: Array[Byte]): Unit = { - val message = new ProducerRecord[Array[Byte], Array[Byte]]( - topic, 0, Injection[Long, Array[Byte]](time), offset) - producer.send(message) - } - - override def close(): Unit = { - producer.close() - KafkaUtil.deleteTopic(connectZk, topic) - } - - private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, Array[Byte])] = { - var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, Array[Byte])] - while (consumer.hasNext) { - val kafkaMsg = consumer.next - kafkaMsg.key.map { k => - Injection.invert[TimeStamp, Array[Byte]](k) match { - case Success(time) => - messagesBuilder += (time -> kafkaMsg.msg) - case Failure(e) => throw e - } - } orElse (throw new RuntimeException("offset key should not be null")) - } - consumer.close() - messagesBuilder.result().toList - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala index 5f48b43..b34149f 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala @@ -28,18 +28,11 @@ class KafkaDSLSink[T](stream: dsl.Stream[T]) { /** Create a Kafka DSL Sink */ def writeToKafka( topic: String, - bootstrapServers: String, - parallism: Int, - description: String): dsl.Stream[T] = { - stream.sink(new KafkaSink(topic, bootstrapServers), parallism, UserConfig.empty, description) - } - - def writeToKafka( - topic: String, properties: Properties, - parallism: Int, - description: String): dsl.Stream[T] = { - stream.sink(new KafkaSink(topic, properties), parallism, UserConfig.empty, description) + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = null): dsl.Stream[T] = { + stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala index 0275966..874d691 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala @@ -22,67 +22,16 @@ import java.util.Properties import org.apache.gearpump.streaming.dsl import org.apache.gearpump.streaming.dsl.StreamApp import org.apache.gearpump.streaming.kafka.KafkaSource -import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig} -import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} object KafkaDSLUtil { - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - kafkaConfig: KafkaSourceConfig, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = { - app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder), - parallelism, description) - } - - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - topics: String, - zkConnect: String, - offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory), - parallelism, description) - } - - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - topics: String, - zkConnect: String, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory, - messageDecoder, timestampFilter), parallelism, description) - } - - def createStream[T]( - app: StreamApp, - parallelism: Int, - description: String, - topics: String, - properties: Properties, - offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, properties, offsetStorageFactory), - parallelism, description) - } def createStream[T]( app: StreamApp, topics: String, parallelism: Int, description: String, - properties: Properties, - offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, properties, offsetStorageFactory, - messageDecoder, timestampFilter), parallelism, description) + properties: Properties): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, properties), parallelism, description) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala deleted file mode 100644 index ea7e8d1..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -import scala.util.{Failure, Success} - -import com.twitter.bijection.Injection - -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.transaction.api.MessageDecoder - -class DefaultMessageDecoder extends MessageDecoder { - override def fromBytes(bytes: Array[Byte]): Message = { - Message(bytes, System.currentTimeMillis()) - } -} - -class StringMessageDecoder extends MessageDecoder { - override def fromBytes(bytes: Array[Byte]): Message = { - Injection.invert[String, Array[Byte]](bytes) match { - case Success(s) => Message(s, System.currentTimeMillis()) - case Failure(e) => throw e - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala deleted file mode 100644 index 88f509b..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -import scala.util.{Failure, Success, Try} - -import com.twitter.bijection.Injection -import org.slf4j.Logger - -import org.apache.gearpump._ -import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} -import org.apache.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage} -import org.apache.gearpump.util.LogUtil - -object KafkaOffsetManager { - private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager]) -} - -private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager { - import org.apache.gearpump.streaming.kafka.lib.KafkaOffsetManager._ - - var maxTime: TimeStamp = 0L - - override def filter(messageAndOffset: (Message, Long)): Option[Message] = { - val (message, offset) = messageAndOffset - if (message.timestamp > maxTime) { - maxTime = message.timestamp - storage.append(maxTime, Injection[Long, Array[Byte]](offset)) - } - Some(message) - } - - override def resolveOffset(time: TimeStamp): Try[Long] = { - storage.lookUp(time) match { - case Success(offset) => Injection.invert[Long, Array[Byte]](offset) - case Failure(Overflow(max)) => - LOG.warn(s"start time larger than the max stored TimeStamp; set to max offset") - Injection.invert[Long, Array[Byte]](max) - case Failure(Underflow(min)) => - LOG.warn(s"start time less than the min stored TimeStamp; set to min offset") - Injection.invert[Long, Array[Byte]](min) - case Failure(StorageEmpty) => Failure(StorageEmpty) - case Failure(e) => throw e - } - } - - override def close(): Unit = { - storage.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala deleted file mode 100644 index ade414e..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -import java.util.Properties - -import kafka.api.OffsetRequest -import kafka.consumer.ConsumerConfig -import org.slf4j.Logger - -import org.apache.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper} -import org.apache.gearpump.util.LogUtil - -object KafkaSourceConfig { - - val NAME = "kafka_config" - - val ZOOKEEPER_CONNECT = "zookeeper.connect" - val GROUP_ID = "group.id" - val CONSUMER_START_OFFSET = "kafka.consumer.start.offset" - val CONSUMER_TOPICS = "kafka.consumer.topics" - val FETCH_THRESHOLD = "kafka.consumer.fetch.threshold" - val FETCH_SLEEP_MS = "kafka.consumer.fetch.sleep.ms" - val GROUPER_CLASS = "kafka.grouper.class" - - private val LOG: Logger = LogUtil.getLogger(getClass) - - def apply(consumerProps: Properties): KafkaSourceConfig = new KafkaSourceConfig(consumerProps) -} - -/** - * Extends kafka.consumer.ConsumerConfig with specific config needed by - * [[org.apache.gearpump.streaming.kafka.KafkaSource]] - * - * @param consumerProps kafka consumer config - */ -class KafkaSourceConfig(val consumerProps: Properties = new Properties) - extends java.io.Serializable { - import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig._ - - if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) { - consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181") - } - - if (!consumerProps.containsKey(GROUP_ID)) { - consumerProps.setProperty(GROUP_ID, "gearpump") - } - - def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps) - - /** - * Set kafka consumer topics, seperated by comma. - * - * @param topics comma-separated string - * @return new KafkaConfig based on this but with - * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]] - * set to given value - */ - def withConsumerTopics(topics: String): KafkaSourceConfig = { - consumerProps.setProperty(CONSUMER_TOPICS, topics) - KafkaSourceConfig(consumerProps) - } - - /** - * Returns a list of kafka consumer topics - */ - def getConsumerTopics: List[String] = { - Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList - } - - /** - * Sets the sleep interval if there are no more message or message buffer is full. - * - * Consumer.FetchThread will sleep for a while if no more messages or - * the incoming queue size is above the - * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] - * - * @param sleepMS sleep interval in milliseconds - * @return new KafkaConfig based on this but with - * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]] - * set to given value - */ - def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = { - consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS") - KafkaSourceConfig(consumerProps) - } - - /** - * Gets the sleep interval - * - * Consumer.FetchThread sleeps for a while if no more messages or - * the incoming queue is full (size is bigger than the - * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]) - * - * @return sleep interval in milliseconds - */ - def getFetchSleepMS: Int = { - Option(consumerProps.getProperty(FETCH_SLEEP_MS)).getOrElse("100").toInt - } - - /** - * Sets the batch size we use for one fetch. - * - * Consumer.FetchThread stops fetching new messages if its incoming queue - * size is above the threshold and starts again when the queue size is below it - * - * @param threshold queue size - * @return new KafkaConfig based on this but with - * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] - * set to give value - */ - def withFetchThreshold(threshold: Int): KafkaSourceConfig = { - consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold") - KafkaSourceConfig(consumerProps) - } - - /** - * Returns fetch batch size. - * - * Consumer.FetchThread stops fetching new messages if - * its incoming queue size is above the threshold and starts again when the queue size is below it - * - * @return fetch threshold - */ - def getFetchThreshold: Int = { - Option(consumerProps.getProperty(FETCH_THRESHOLD)).getOrElse("10000").toInt - } - - /** - * Sets [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which - * defines how kafka.common.TopicAndPartitions are mapped to source tasks. - * - * @param className name of the factory class - * @return new KafkaConfig based on this but with - * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]] - * set to given value - */ - def withGrouper(className: String): KafkaSourceConfig = { - consumerProps.setProperty(GROUPER_CLASS, className) - KafkaSourceConfig(consumerProps) - } - - /** - * Returns [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which - * defines how kafka.common.TopicAndPartitions are mapped to source tasks - */ - def getGrouper: KafkaGrouper = { - Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS)) - .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper] - } - - def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = { - consumerProps.setProperty(CONSUMER_START_OFFSET, s"$earliestOrLatest") - KafkaSourceConfig(consumerProps) - } - - def getConsumerStartOffset: Long = { - Option(consumerProps.getProperty(CONSUMER_START_OFFSET)) - .getOrElse(s"${OffsetRequest.EarliestTime}").toLong - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala deleted file mode 100644 index e8cf574..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -import java.io.InputStream -import java.util.Properties - -import kafka.admin.AdminUtils -import kafka.cluster.Broker -import kafka.common.TopicAndPartition -import kafka.consumer.ConsumerConfig -import kafka.utils.{ZKStringSerializer, ZkUtils} -import org.I0Itec.zkclient.ZkClient -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} -import org.apache.kafka.common.serialization.Serializer -import org.slf4j.Logger - -import org.apache.gearpump.util.LogUtil - -object KafkaUtil { - private val LOG: Logger = LogUtil.getLogger(getClass) - - def getBroker(connectZk: => ZkClient, topic: String, partition: Int): Broker = { - val zkClient = connectZk - try { - val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) - .getOrElse(throw new RuntimeException( - s"leader not available for TopicAndPartition($topic, $partition)")) - ZkUtils.getBrokerInfo(zkClient, leader) - .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader")) - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String]) - : Array[TopicAndPartition] = { - val zkClient = connectZk - try { - ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap { - case (topic, partitions) => partitions.map(TopicAndPartition(topic, _)) - }.toArray - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - def topicExists(connectZk: => ZkClient, topic: String): Boolean = { - val zkClient = connectZk - try { - AdminUtils.topicExists(zkClient, topic) - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - /** - * create a new kafka topic - * return true if topic already exists, and false otherwise - */ - def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int) - : Boolean = { - val zkClient = connectZk - try { - if (AdminUtils.topicExists(zkClient, topic)) { - LOG.info(s"topic $topic exists") - true - } else { - AdminUtils.createTopic(zkClient, topic, partitions, replicas) - LOG.info(s"created topic $topic") - false - } - } catch { - case e: Exception => - LOG.error(e.getMessage) - throw e - } finally { - zkClient.close() - } - } - - def deleteTopic(connectZk: => ZkClient, topic: String): Unit = { - val zkClient = connectZk - try { - AdminUtils.deleteTopic(zkClient, topic) - } catch { - case e: Exception => - LOG.error(e.getMessage) - } finally { - zkClient.close() - } - } - - def connectZookeeper(config: ConsumerConfig): () => ZkClient = { - val zookeeperConnect = config.zkConnect - val sessionTimeout = config.zkSessionTimeoutMs - val connectionTimeout = config.zkConnectionTimeoutMs - () => new ZkClient(zookeeperConnect, sessionTimeout, connectionTimeout, ZKStringSerializer) - } - - def loadProperties(filename: String): Properties = { - val props = new Properties() - var propStream: InputStream = null - try { - propStream = getClass.getClassLoader.getResourceAsStream(filename) - props.load(propStream) - } catch { - case e: Exception => - LOG.error(s"$filename not found") - } finally { - if (propStream != null) { - propStream.close() - } - } - props - } - - def createKafkaProducer[K, V](properties: Properties, - keySerializer: Serializer[K], - valueSerializer: Serializer[V]): KafkaProducer[K, V] = { - if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { - properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - } - new KafkaProducer[K, V](properties, keySerializer, valueSerializer) - } - - def buildProducerConfig(bootstrapServers: String): Properties = { - val properties = new Properties() - properties.setProperty("bootstrap.servers", bootstrapServers) - properties - } - - def buildConsumerConfig(zkConnect: String): Properties = { - val properties = new Properties() - properties.setProperty("zookeeper.connect", zkConnect) - properties.setProperty("group.id", "gearpump") - properties - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala deleted file mode 100644 index ce17f5a..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.consumer - -/** - * someone sleeps for exponentially increasing duration each time - * until the cap - * - * @param backOffMultiplier The factor by which the duration increases. - * @param initialDurationMs Time in milliseconds for initial sleep. - * @param maximumDurationMs Cap up to which we will increase the duration. - */ -private[consumer] class ExponentialBackoffSleeper( - backOffMultiplier: Double = 2.0, - initialDurationMs: Long = 100, - maximumDurationMs: Long = 10000) { - - require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1") - require(initialDurationMs > 0, "initialDurationMs must be positive") - require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs") - - private var sleepDuration = initialDurationMs - - def reset(): Unit = { - sleepDuration = initialDurationMs - } - - def sleep(): Unit = { - Thread.sleep(sleepDuration) - setNextSleepDuration() - } - - def getSleepDuration: Long = sleepDuration - - def setNextSleepDuration(): Unit = { - val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long] - sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala deleted file mode 100644 index 8550207..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.consumer - -import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.LinkedBlockingQueue - -import kafka.common.TopicAndPartition -import kafka.consumer.ConsumerConfig -import org.slf4j.Logger - -import org.apache.gearpump.util.LogUtil - -object FetchThread { - private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread]) - - def apply(topicAndPartitions: Array[TopicAndPartition], - fetchThreshold: Int, - fetchSleepMS: Long, - startOffsetTime: Long, - consumerConfig: ConsumerConfig): FetchThread = { - val createConsumer = (tp: TopicAndPartition) => - KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig) - - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS) - } -} - -/** - * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them - * onto a queue, which is asynchronously polled by a consumer - * - * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a - * [[org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to - * connect to it - * @param incomingQueue a queue to buffer incoming messages - * @param fetchThreshold above which thread should stop fetching messages - * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold - */ -private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition], - createConsumer: TopicAndPartition => KafkaConsumer, - incomingQueue: LinkedBlockingQueue[KafkaMessage], - fetchThreshold: Int, - fetchSleepMS: Long) extends Thread { - import org.apache.gearpump.streaming.kafka.lib.consumer.FetchThread._ - - private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers - - def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = { - consumers(tp).setStartOffset(startOffset) - } - - def poll: Option[KafkaMessage] = { - Option(incomingQueue.poll()) - } - - override def run(): Unit = { - try { - var nextOffsets = Map.empty[TopicAndPartition, Long] - var reset = false - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 100L, - maximumDurationMs = 10000L) - while (!Thread.currentThread().isInterrupted) { - try { - if (reset) { - nextOffsets = consumers.mapValues(_.getNextOffset) - resetConsumers(nextOffsets) - reset = false - } - val hasMoreMessages = fetchMessage - sleeper.reset() - if (!hasMoreMessages) { - Thread.sleep(fetchSleepMS) - } - } catch { - case exception: Exception => - LOG.warn(s"resetting consumers due to $exception") - reset = true - sleeper.sleep() - } - } - } catch { - case e: InterruptedException => LOG.info("fetch thread got interrupted exception") - case e: ClosedByInterruptException => LOG.info("fetch thread closed by interrupt exception") - } finally { - consumers.values.foreach(_.close()) - } - } - - /** - * fetch message from each TopicAndPartition in a round-robin way - */ - def fetchMessage: Boolean = { - consumers.foldLeft(false) { (hasNext, tpAndConsumer) => - val (_, consumer) = tpAndConsumer - if (incomingQueue.size < fetchThreshold) { - if (consumer.hasNext) { - incomingQueue.put(consumer.next()) - true - } else { - hasNext - } - } else { - true - } - } - } - - private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = { - topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap - } - - private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = { - consumers.values.foreach(_.close()) - consumers = createAllConsumers - consumers.foreach { case (tp, consumer) => - consumer.setStartOffset(nextOffsets(tp)) - } - } -}
