[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15409006#comment-15409006 ]
ASF GitHub Bot commented on BAHIR-39: ------------------------------------- Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73649345 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + + val SCHEMA_DEFAULT = StructType(StructField("value", StringType) + :: StructField("timestamp", TimestampType) :: Nil) +} + +class MQTTTextStreamSource(brokerUrl: String, persistence: MqttClientPersistence, + topic: String, clientId: String, messageParser: Array[Byte] => (String, Timestamp), + sqlContext: SQLContext) extends Source with Logging { + + override def schema: StructType = MQTTStreamConstants.SCHEMA_DEFAULT + + private val store = new LocalMessageStore(persistence, sqlContext.sparkContext.getConf) + + private val messages = new TrieMap[Int, (String, Timestamp)] + + private val initLock = new CountDownLatch(1) + + private var offset = 0 + + private var client: MqttClient = _ + + private def fetchLastProcessedOffset(): Int = { + Try(store.maxProcessedOffset) match { + case Success(x) => + log.info(s"Recovering from last stored offset $x") + x + case Failure(e) => 0 + } + } + + initialize() + private def initialize(): Unit = { + + client = new MqttClient(brokerUrl, clientId, persistence) + val mqttConnectOptions: MqttConnectOptions = new MqttConnectOptions() + mqttConnectOptions.setAutomaticReconnect(true) + // This is required to support recovery. TODO: configurable ? + mqttConnectOptions.setCleanSession(false) + + val callback = new MqttCallbackExtended() { + + override def messageArrived(topic_ : String, message: MqttMessage): Unit = synchronized { + initLock.await() // Wait for initialization to complete. + val temp = offset + 1 + messages.put(temp, messageParser(message.getPayload)) + offset = temp + log.trace(s"Message arrived, $topic_ $message") + } + + override def deliveryComplete(token: IMqttDeliveryToken): Unit = { --- End diff -- This is never called in case of a receiver only client. > MQTT as a streaming source for SQL Streaming. > --------------------------------------------- > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources > Affects Versions: 2.1.0 > Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)