[ 
https://issues.apache.org/jira/browse/BAHIR-89?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894009#comment-15894009
 ] 

ASF GitHub Bot commented on BAHIR-89:
-------------------------------------

Github user ckadner commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/37#discussion_r104118867
  
    --- Diff: 
streaming-mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala 
---
    @@ -199,7 +199,181 @@ object MQTTUtils {
         createStream(jssc.ssc, brokerUrl, topic, 
StorageLevel.MEMORY_AND_DISK_SER_2, Option(clientId),
           Option(username), Option(password), Option(cleanSession), None, 
None, None, None)
       }
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * @param ssc           StreamingContext object
    +   * @param brokerUrl     Url of remote MQTT publisher
    +   * @param topics        Array of topic names to subscribe to
    +   * @param storageLevel  RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
    +   */
    +  def createPairedStream(
    +      ssc: StreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String],
    +      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    +    ): ReceiverInputDStream[(String, String)] = {
    +    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel)
    +  }
    +
     
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * @param ssc                StreamingContext object
    +   * @param brokerUrl          Url of remote MQTT publisher
    +   * @param topics             Array of topic names to subscribe to
    +   * @param storageLevel       RDD storage level. Defaults to 
StorageLevel.MEMORY_AND_DISK_SER_2.
    +   * @param clientId           ClientId to use for the mqtt connection
    +   * @param username           Username for authentication to the mqtt 
publisher
    +   * @param password           Password for authentication to the mqtt 
publisher
    +   * @param cleanSession       Sets the mqtt cleanSession parameter
    +   * @param qos                Quality of service to use for the topic 
subscription
    +   * @param connectionTimeout  Connection timeout for the mqtt connection
    +   * @param keepAliveInterval  Keepalive interal for the mqtt connection
    +   * @param mqttVersion        Version to use for the mqtt connection
    +   */
    +  def createPairedStream(
    +      ssc: StreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String],
    +      storageLevel: StorageLevel,
    +      clientId: Option[String],
    +      username: Option[String],
    +      password: Option[String],
    +      cleanSession: Option[Boolean],
    +      qos: Option[Int],
    +      connectionTimeout: Option[Int],
    +      keepAliveInterval: Option[Int],
    +      mqttVersion: Option[Int]
    +    ): ReceiverInputDStream[(String, String)] = {
    +    new MQTTPairedInputDStream(ssc, brokerUrl, topics, storageLevel, 
clientId, username, password,
    +          cleanSession, qos, connectionTimeout, keepAliveInterval, 
mqttVersion)
    +  }
    +
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
    +   * @param jssc      JavaStreamingContext object
    +   * @param brokerUrl Url of remote MQTT publisher
    +   * @param topic     Array of topic names to subscribe to
    +   */
    +  def createPairedStream(
    +      jssc: JavaStreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String]
    +    ): JavaReceiverInputDStream[(String, String)] = {
    +    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
    +    createPairedStream(jssc.ssc, brokerUrl, topics)
    +  }
    +
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * @param jssc          JavaStreamingContext object
    +   * @param brokerUrl     Url of remote MQTT publisher
    +   * @param topics        Array of topic names to subscribe to
    +   * @param storageLevel  RDD storage level.
    +   */
    +  def createPairedStream(
    +      jssc: JavaStreamingContext,
    +      brokerUrl: String,
    +      topics: Array[String],
    +      storageLevel: StorageLevel
    +    ): JavaReceiverInputDStream[(String, String)] = {
    +    implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
    +    createPairedStream(jssc.ssc, brokerUrl, topics, storageLevel)
    +  }
    +
    +  /**
    +   * Create an input stream that receives messages pushed by a MQTT 
publisher.
    +   * @param jssc               JavaStreamingContext object
    +   * @param brokerUrl          Url of remote MQTT publisher
    +   * @param topic              Array of topic names to subscribe to
    --- End diff --
    
    should be @param topics (plural)


> New API for subscribing from a list of MQTT topics and Return tuple of 
> <Topic,Message> as output
> ------------------------------------------------------------------------------------------------
>
>                 Key: BAHIR-89
>                 URL: https://issues.apache.org/jira/browse/BAHIR-89
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark Streaming Connectors
>    Affects Versions: Not Applicable
>         Environment: Spark Streaming MQTT Connector
>            Reporter: Anntinu Josy
>              Labels: MQTT, SPARK
>             Fix For: Not Applicable
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I am working in IoT Project. As part of MQTT-Kafka bridge program development 
> I used Bahir. I feel that it will be a good feature to prove a new API to 
> support a list of MQTT topic as input and output as a tuple of <Topic, 
> Message>. This will be useful to reduce resource usage in case of multiple 
> topic subscription. I had developed this feature a like to integrate



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to