[ 
https://issues.apache.org/jira/browse/ROCKETMQ-81?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982363#comment-15982363
 ] 

ASF GitHub Bot commented on ROCKETMQ-81:
----------------------------------------

Github user hustfxj commented on a diff in the pull request:

    
https://github.com/apache/incubator-rocketmq-externals/pull/4#discussion_r113107521
  
    --- Diff: 
rocketmq-spark/src/main/scala/org/apache/rocketmq/spark/streaming/MQPullInputDStream.scala
 ---
    @@ -0,0 +1,535 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming
    +
    +import java.util.concurrent.atomic.AtomicReference
    +import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
    +import java.{lang => jl, util => ju}
    +
    +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer
    +import org.apache.rocketmq.client.consumer.store.ReadOffsetType
    +import org.apache.rocketmq.common.MixAll
    +import org.apache.rocketmq.common.message.{MessageExt, MessageQueue}
    +import org.apache.rocketmq.spark.{ConsumerStrategy, _}
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.streaming.dstream.{DStream, DStreamCheckpointData, 
InputDStream}
    +import org.apache.spark.streaming.scheduler.rate.RateEstimator
    +import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
    +import org.apache.spark.util.ThreadUtils
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +/**
    +  *  A DStream where
    +  * each given RocketMq topic/queueId corresponds to an RDD partition.
    +  * The configuration pull.max.speed.per.partition gives the maximum number
    +  *  of messages per second that each '''partition''' will accept.
    +  * @param groupId it is for rocketMq for identifying the consumer
    +  * @param topics the topics for the rocketmq
    +  * @param locationStrategy locationStrategy In most cases, pass in 
[[LocationStrategy.PreferConsistent]],
    +  *   see [[LocationStrategy]] for more details.
    +  * @param consumerStrategy consumerStrategy In most cases, pass in 
[[ConsumerStrategy.lastest]],
    +  *   see [[ConsumerStrategy]] for more details
    +  * @param autoCommit  whether commit the offset to the rocketmq server 
automatically or not
    +  * @param forceSpecial Generally if the rocketmq server has checkpoint 
for the [[MessageQueue]], then the consumer
    +  *  will consume from the checkpoint no matter we specify the offset or 
not. But if forceSpecial is true,
    +  *  the rocketmq will start consuming from the specific available offset 
in any case.
    +  * @param failOnDataLoss Zero data lost is not guaranteed when topics are 
deleted. If zero data lost is critical, 
    +  * the user must make sure all messages in a topic have been processed 
when deleting a topic.
    +  */
    +class MQPullInputDStream(
    +    _ssc: StreamingContext,
    +    groupId: String,
    +    topics: ju.Collection[jl.String],
    +    optionParams: ju.Map[String, String],
    +    locationStrategy: LocationStrategy,
    +    consumerStrategy: ConsumerStrategy,
    +    autoCommit: Boolean,
    +    forceSpecial: Boolean,
    +    failOnDataLoss: Boolean
    +  ) extends InputDStream[MessageExt](_ssc) with CanCommitOffsets{
    +
    +  private var currentOffsets = mutable.Map[TopicQueueId, Map[String, 
Long]]()
    +
    +  private val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
    +
    +  private val commitCallback = new AtomicReference[OffsetCommitCallback]
    +
    +  private val maxRateLimitPerPartition = 
optionParams.getOrDefault(RocketMQConfig.MAX_PULL_SPEED_PER_PARTITION,
    +    "-1").toInt
    +  
    +  @transient private var kc: DefaultMQPullConsumer = null
    +
    +  /**
    +    * start up timer thread to persis the OffsetStore
    +    */
    +  private val scheduledExecutorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
    --- End diff --
    
    add @transient for this val


> Add the RocketMq plugin for the Apache Spark
> --------------------------------------------
>
>                 Key: ROCKETMQ-81
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-81
>             Project: Apache RocketMQ
>          Issue Type: Task
>          Components: rocketmq-externals
>    Affects Versions: 4.1.0-incubating
>            Reporter: Longda Feng
>            Assignee: Xin Wang
>            Priority: Minor
>
> Since the Apache RocketMq 4.0 will be released in the next few days, we can 
> start the job of adding the RocketMq plugin for the Apache Spark.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to