[ https://issues.apache.org/jira/browse/BAHIR-39?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15406487#comment-15406487 ]
ASF GitHub Bot commented on BAHIR-39: ------------------------------------- Github user lresende commented on a diff in the pull request: https://github.com/apache/bahir/pull/13#discussion_r73405558 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.Calendar +import java.util.concurrent.CountDownLatch + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success, Try} + +import org.apache.bahir.utils.Logging +import org.eclipse.paho.client.mqttv3._ +import org.eclipse.paho.client.mqttv3.persist.{MemoryPersistence, MqttDefaultFilePersistence} + +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Source} +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + + +object MQTTStreamConstants { + + val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") --- End diff -- Not thread safe, see comments below on calendar usage... > MQTT as a streaming source for SQL Streaming. > --------------------------------------------- > > Key: BAHIR-39 > URL: https://issues.apache.org/jira/browse/BAHIR-39 > Project: Bahir > Issue Type: New Feature > Components: Spark SQL Data Sources > Affects Versions: 2.1.0 > Reporter: Prashant Sharma > > MQTT compatible streaming source for Spark SQL Streaming. -- This message was sent by Atlassian JIRA (v6.3.4#6332)