[ https://issues.apache.org/jira/browse/BAHIR-183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708097#comment-16708097 ]
ASF GitHub Bot commented on BAHIR-183: -------------------------------------- Github user yanlin-Lynn commented on a diff in the pull request: https://github.com/apache/bahir/pull/72#discussion_r238509713 --- Diff: sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/HDFSMQTTSourceProvider.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bahir.sql.streaming.mqtt + +import java.util.Locale + +import org.eclipse.paho.client.mqttv3.{MqttClient, MqttConnectOptions} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.execution.streaming.Source +import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} + +import org.apache.bahir.utils.MQTTConfig + +/** + * The provider class for creating MQTT source. + * This provider throw IllegalArgumentException if 'brokerUrl' or 'topic' parameter + * is not set in options. + */ +class HDFSMQTTSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging { + + override def sourceSchema(sqlContext: SQLContext, schema: Option[StructType], + providerName: String, parameters: Map[String, String]): (String, StructType) = { + ("mqtt", HDFSMQTTSourceProvider.SCHEMA_DEFAULT) + } + + override def createSource(sqlContext: SQLContext, metadataPath: String, + schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { + + def e(s: String) = new IllegalArgumentException(s) + + val caseInsensitiveParameter = parameters.map{case (key: String, value: String) => + key.toLowerCase(Locale.ROOT) -> value + } + + val brokerUrl: String = getParameterValue(caseInsensitiveParameter, MQTTConfig.brokerUrl, "") --- End diff -- Thansk for your time for reviewing. This code is the same as we use in company, I will adjust it later! > Using HDFS for saving message for mqtt source > --------------------------------------------- > > Key: BAHIR-183 > URL: https://issues.apache.org/jira/browse/BAHIR-183 > Project: Bahir > Issue Type: Improvement > Components: Spark Structured Streaming Connectors > Affects Versions: Spark-2.2.0 > Reporter: Wang Yanlin > Priority: Major > Fix For: Spark-2.2.1 > > > Currently in spark-sql-streaming-mqtt, the received mqtt message is saved in > a local file by driver, this will have the risks of losing data for cluster > mode when application master failover occurs. So saving in-coming mqtt > messages using a director in checkpoint will solve this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)