[
https://issues.apache.org/jira/browse/ROCKETMQ-81?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982363#comment-15982363
]
ASF GitHub Bot commented on ROCKETMQ-81:
----------------------------------------
Github user hustfxj commented on a diff in the pull request:
https://github.com/apache/incubator-rocketmq-externals/pull/4#discussion_r113107521
--- Diff:
rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala
---
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
+import java.{lang => jl, util => ju}
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType
+import org.apache.rocketmq.common.MixAll
+import org.apache.rocketmq.common.message.{MessageExt, MessageQueue}
+import org.apache.rocketmq.spark.{ConsumerStrategy, _}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.{DStream, DStreamCheckpointData,
InputDStream}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.streaming.scheduler.{RateController,
StreamInputInfo}
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * A DStream where
+ * each given RocketMq topic/queueId corresponds to an RDD partition.
+ * The configuration pull.max.speed.per.partition gives the maximum number
+ * of messages per second that each '''partition''' will accept.
+ * @param groupId it is for rocketMq for identifying the consumer
+ * @param topics the topics for the rocketmq
+ * @param locationStrategy locationStrategy In most cases, pass in
[[LocationStrategy.PreferConsistent]],
+ * see [[LocationStrategy]] for more details.
+ * @param consumerStrategy consumerStrategy In most cases, pass in
[[ConsumerStrategy.lastest]],
+ * see [[ConsumerStrategy]] for more details
+ * @param autoCommit whether commit the offset to the rocketmq server
automatically or not
+ * @param forceSpecial Generally if the rocketmq server has checkpoint
for the [[MessageQueue]], then the consumer
+ * will consume from the checkpoint no matter we specify the offset or
not. But if forceSpecial is true,
+ * the rocketmq will start consuming from the specific available offset
in any case.
+ * @param failOnDataLoss Zero data lost is not guaranteed when topics are
deleted. If zero data lost is critical,
+ * the user must make sure all messages in a topic have been processed
when deleting a topic.
+ */
+class MQPullInputDStream(
+ _ssc: StreamingContext,
+ groupId: String,
+ topics: ju.Collection[jl.String],
+ optionParams: ju.Map[String, String],
+ locationStrategy: LocationStrategy,
+ consumerStrategy: ConsumerStrategy,
+ autoCommit: Boolean,
+ forceSpecial: Boolean,
+ failOnDataLoss: Boolean
+ ) extends InputDStream[MessageExt](_ssc) with CanCommitOffsets{
+
+ private var currentOffsets = mutable.Map[TopicQueueId, Map[String,
Long]]()
+
+ private val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
+
+ private val commitCallback = new AtomicReference[OffsetCommitCallback]
+
+ private val maxRateLimitPerPartition =
optionParams.getOrDefault(RocketMQConfig.MAX_PULL_SPEED_PER_PARTITION,
+ "-1").toInt
+
+ @transient private var kc: DefaultMQPullConsumer = null
+
+ /**
+ * start up timer thread to persis the OffsetStore
+ */
+ private val scheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
--- End diff --
add @transient for this val
> Add the RocketMq plugin for the Apache Spark
> --------------------------------------------
>
> Key: ROCKETMQ-81
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-81
> Project: Apache RocketMQ
> Issue Type: Task
> Components: rocketmq-externals
> Affects Versions: 4.1.0-incubating
> Reporter: Longda Feng
> Assignee: Xin Wang
> Priority: Minor
>
> Since the Apache RocketMq 4.0 will be released in the next few days, we can
> start the job of adding the RocketMq plugin for the Apache Spark.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)