[ 
https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406490#comment-15406490
 ] 

ASF GitHub Bot commented on BAHIR-39:
-------------------------------------

Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/13#discussion_r73405744
  
    --- Diff: 
sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
 ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.mqtt
    +
    +import java.sql.Timestamp
    +import java.text.SimpleDateFormat
    +import java.util.Calendar
    +import java.util.concurrent.CountDownLatch
    +
    +import scala.collection.concurrent.TrieMap
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.bahir.utils.Logging
    +import org.eclipse.paho.client.mqttv3._
    +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, 
MqttDefaultFilePersistence}
    +
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, 
Source}
    +import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
    +import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
    +
    +
    +object MQTTStreamConstants {
    +
    +  val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    +
    +  val SCHEMA_DEFAULT = StructType(StructField("value", StringType)
    +    :: StructField("timestamp", TimestampType) :: Nil)
    +}
    +
    +class MQTTTextStreamSource(brokerUrl: String, persistence: 
MqttClientPersistence,
    +    topic: String, clientId: String, messageParser: Array[Byte] => 
(String, Timestamp),
    +    sqlContext: SQLContext) extends Source with Logging {
    +
    +  override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT
    +
    +  private val store = new LocalMessageStore(persistence, 
sqlContext.sparkContext.getConf)
    +
    +  private val messages = new TrieMap[Int, (String, Timestamp)]
    +
    +  private val initLock = new CountDownLatch(1)
    +
    +  private var offset = 0
    +
    +  private var client: MqttClient = _
    +
    +  private def fetchLastProcessedOffset(): Int = {
    +    Try(store.maxProcessedOffset) match {
    +      case Success(x) =>
    +        log.info(s"Recovering from last stored offset $x")
    +        x
    +      case Failure(e) => 0
    +    }
    +  }
    +
    +  initialize()
    +  private def initialize(): Unit = {
    +
    +    client = new MqttClient(brokerUrl, clientId, persistence)
    +    val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions()
    +    mqttConnectOptions.setAutomaticReconnect(true)
    +    // This is required to support recovery. TODO: configurable ?
    +    mqttConnectOptions.setCleanSession(false)
    +
    +    val callback = new MqttCallbackExtended() {
    +
    +      override def messageArrived(topic_ : String, message: MqttMessage): 
Unit = synchronized {
    +        initLock.await() // Wait for initialization to complete.
    +        val temp = offset + 1
    +        messages.put(temp, messageParser(message.getPayload))
    +        offset = temp
    +        log.trace(s"Message arrived, $topic_ $message")
    +      }
    +
    +      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
    +      }
    +
    +      override def connectionLost(cause: Throwable): Unit = {
    +        log.warn("Connection to mqtt server lost.", cause)
    +      }
    +
    +      override def connectComplete(reconnect: Boolean, serverURI: String): 
Unit = {
    +        log.info(s"Connect complete $serverURI. Is it a reconnect?: 
$reconnect")
    +      }
    +    }
    +    client.setCallback(callback)
    +    client.connect(mqttConnectOptions)
    +    client.subscribe(topic)
    +    // It is not possible to initialize offset without `client.connect`
    +    offset = fetchLastProcessedOffset()
    +    initLock.countDown() // Release.
    +  }
    +
    +  /** Stop this source and free any resources it has allocated. */
    +  override def stop(): Unit = {
    +    client.disconnect()
    +    persistence.close()
    +    client.close()
    +  }
    +
    +  /** Returns the maximum available offset for this source. */
    +  override def getOffset: Option[Offset] = {
    +    if (offset == 0) {
    +      None
    +    } else {
    +      Some(LongOffset(offset))
    +    }
    +  }
    +
    +  /**
    +   * Returns the data that is between the offsets (`start`, `end`]. When 
`start` is `None` then
    +   * the batch should begin with the first available record. This method 
must always return the
    +   * same data for a particular `start` and `end` pair.
    +   */
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
synchronized {
    +    val startIndex = 
start.getOrElse(LongOffset(0L)).asInstanceOf[LongOffset].offset.toInt
    +    val endIndex = end.asInstanceOf[LongOffset].offset.toInt
    +    val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
    +    // Move consumed messages to persistent store.
    +    (startIndex + 1 to endIndex).foreach { id =>
    +      val element: (String, Timestamp) = messages.getOrElse(id, 
store.retrieve(id))
    +      data += element
    +      store.store(id, element)
    +      messages.remove(id, element)
    +    }
    +    log.trace(s"Get Batch invoked, ${data.mkString}")
    +    import sqlContext.implicits._
    +    data.toDF("value", "timestamp")
    +  }
    +
    +}
    +
    +class MQTTStreamSourceProvider extends StreamSourceProvider with 
DataSourceRegister with Logging {
    +
    +  override def sourceSchema(sqlContext: SQLContext, schema: 
Option[StructType],
    +      providerName: String, parameters: Map[String, String]): (String, 
StructType) = {
    +    ("mqtt", MQTTStreamConstants.SCHEMA_DEFAULT)
    +  }
    +
    +  override def createSource(sqlContext: SQLContext, metadataPath: String,
    +      schema: Option[StructType], providerName: String, parameters: 
Map[String, String]): Source = {
    +
    +    def e(s: String) = new IllegalArgumentException(s)
    +
    +    val brokerUrl: String = parameters.getOrElse("brokerUrl", 
parameters.getOrElse("path",
    +      throw e("Please provide a `brokerUrl` by specifying path or 
.options(\"brokerUrl\",...)")))
    +
    +
    +    val persistence: MqttClientPersistence = parameters.get("persistence") 
match {
    +      case Some("memory") => new MemoryPersistence()
    +      case _ => val localStorage: Option[String] = 
parameters.get("localStorage")
    +        localStorage match {
    +          case Some(x) => new MqttDefaultFilePersistence(x)
    +          case None => new MqttDefaultFilePersistence()
    +        }
    +    }
    +
    +    val messageParserWithTimeStamp = (x: Array[Byte]) => (new String(x), 
Timestamp.valueOf(
    +      
MQTTStreamConstants.DATE_FORMAT.format(Calendar.getInstance().getTime)))
    --- End diff --
    
    An option here would be to use something like Spark DateTimeUtils which 
handles usage of these classes in ThreadLocal to avoid concurrency issues


> MQTT as a streaming source for SQL Streaming.
> ---------------------------------------------
>
>                 Key: BAHIR-39
>                 URL: https://issues.apache.org/jira/browse/BAHIR-39
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark SQL Data Sources
>    Affects Versions: 2.1.0
>            Reporter: Prashant Sharma
>
> MQTT compatible streaming source for Spark SQL Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to