[
https://issues.apache.org/jira/browse/GEARPUMP-122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15303673#comment-15303673
]
ASF GitHub Bot commented on GEARPUMP-122:
-----------------------------------------
Github user huafengw commented on a diff in the pull request:
https://github.com/apache/incubator-gearpump/pull/25#discussion_r64864833
--- Diff:
external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source
+
+import java.util.Properties
+
+import com.twitter.bijection.Injection
+import kafka.common.TopicAndPartition
+import org.apache.gearpump.streaming.kafka.KafkaSource
+import
org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import KafkaClient.KafkaClientFactory
+import
org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage,
FetchThread}
+import
org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import
org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.transaction.api._
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
+
+object AbstractKafkaSource {
+ private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
+}
+
+/**
+ * Contains implementation for Kafka source connectors, users should use
+ * [[org.apache.gearpump.streaming.kafka.KafkaSource]].
+ *
+ * This is a TimeReplayableSource which is able to replay messages given a
start time.
+ * Each kafka message is tagged with a timestamp by
+ * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and
the (timestamp, offset)
+ * mapping is stored to a
[[org.apache.gearpump.streaming.transaction.api.CheckpointStore]].
+ * On recovery, we could retrieve the previously stored offset from the
+ * [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]] by
timestamp and start to read
+ * from there.
+ *
+ * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]]
and further filtered by a
+ * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]]
+ * such that obsolete messages are dropped.
+ */
+abstract class AbstractKafkaSource(
+ topic: String,
+ props: Properties,
+ kafkaConfigFactory: KafkaConfigFactory,
+ kafkaClientFactory: KafkaClientFactory,
+ fetchThreadFactory: FetchThreadFactory)
+ extends TimeReplayableSource {
+ import
org.apache.gearpump.streaming.kafka.lib.source.AbstractKafkaSource._
+
+ def this(topic: String, properties: Properties) = {
+ this(topic, properties, new KafkaConfigFactory, KafkaClient.factory,
FetchThread.factory)
+ }
+
+ private lazy val config: KafkaConfig =
kafkaConfigFactory.getKafkaConfig(props)
+ private lazy val kafkaClient: KafkaClient =
kafkaClientFactory.getKafkaClient(config)
+ private lazy val fetchThread: FetchThread =
fetchThreadFactory.getFetchThread(config, kafkaClient)
+ private lazy val messageDecoder = config.getConfiguredInstance(
+ KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder])
+ private lazy val timestampFilter = config.getConfiguredInstance(
+ KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG, classOf[TimeStampFilter])
+
+ private var startTime: Long = 0L
+ private var checkpointStoreFactory: Option[CheckpointStoreFactory] = None
+ private var checkpointStores: Map[TopicAndPartition, CheckpointStore] =
+ Map.empty[TopicAndPartition, CheckpointStore]
+
+ override def checkpoint(checkpointStoreFactory: CheckpointStoreFactory):
Unit = {
+ this.checkpointStoreFactory = Some(checkpointStoreFactory)
+ }
+
+ override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+ import context.{parallelism, taskId}
+
+ LOG.info("KafkaSource opened at start time {}", startTime)
--- End diff --
all right, I missed that part.
> KafkaSource Stuck
> -----------------
>
> Key: GEARPUMP-122
> URL: https://issues.apache.org/jira/browse/GEARPUMP-122
> Project: Apache Gearpump
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.8.0
> Reporter: Qi Shu
> Assignee: Manu Zhang
> Fix For: 0.8.1
>
> Attachments: 2.png, screenshot-1.png
>
>
> Kafka's version is 2.10-0.8.2.0 and deployed on a cluster of 3 machines.
> Gearpump's version is 2.11-0.8.0 and deployed on local mode.
> The app running on gearpump is a java example Kafka2Kafka.
> The topic has millions of messages in It。
> After app started for a while, there were no messages received from kafka,
> but kafka console can recevie messages from that topic.
> And if I use the kafka source without offset checkpoint, then messages begin
> to flow.
> "screenshot-1.png" means app started for 7 minutes and no messages received
> from kafka source.
> "2.png" was a screen shot of JProfiler, I use JProfiler to watch the thread
> of Gearpump, and it seems the kafka source stucked when doing offset
> checkpoint.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)