[ 
https://issues.apache.org/jira/browse/BAHIR-97?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957651#comment-15957651
 ] 

ASF GitHub Bot commented on BAHIR-97:
-------------------------------------

Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/38#discussion_r110021902
  
    --- Diff: 
sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
 ---
    @@ -0,0 +1,188 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.bahir.sql.streaming.akka
    +
    +import java.io.File
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.scalatest.BeforeAndAfter
    +
    +import org.apache.spark.{SharedSparkContext, SparkFunSuite}
    +import org.apache.spark.sql.{DataFrame, SQLContext}
    +import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
    +import org.apache.spark.sql.execution.streaming.LongOffset
    +
    +import org.apache.bahir.utils.BahirUtils
    +
    +class AkkaStreamSourceSuite extends SparkFunSuite with SharedSparkContext 
with BeforeAndAfter {
    +
    +  protected var akkaTestUtils: AkkaTestUtils = _
    +  protected val tempDir: File =
    +    new File(System.getProperty("java.io.tmpdir") + 
"/spark-akka-persistence")
    +
    +  akkaTestUtils = new AkkaTestUtils
    +  akkaTestUtils.setup()
    +
    +  before {
    +    tempDir.mkdirs()
    +  }
    +
    +  after {
    +    Persistence.close()
    +    BahirUtils.recursiveDeleteDir(tempDir)
    +  }
    +
    +  protected val tmpDir: String = tempDir.getAbsolutePath
    +
    +  protected def createStreamingDataframe(dir: String = tmpDir): 
(SQLContext, DataFrame) = {
    +
    +    val sqlContext: SQLContext = new SQLContext(sc)
    +
    +    sqlContext.setConf("spark.sql.streaming.checkpointLocation", dir + 
"/checkpoint")
    +
    +    val dataFrame: DataFrame =
    +      
sqlContext.readStream.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
    +        .option("urlOfPublisher", akkaTestUtils.getFeederActorUri())
    +        .option("persistenceDirPath", dir + "/persistence").load()
    +    (sqlContext, dataFrame)
    +  }
    +}
    +
    +class BasicAkkaSourceSuite extends AkkaStreamSourceSuite {
    +
    +  private def writeStreamResults(sqlContext: SQLContext, dataFrame: 
DataFrame,
    +                                 waitDuration: Long): Boolean = {
    +    import sqlContext.implicits._
    +    dataFrame.as[(String, Timestamp)].writeStream.format("parquet")
    +      .start(s"$tmpDir/parquet/t.parquet").awaitTermination(waitDuration)
    +  }
    +
    +  private def readBackSreamingResults(sqlContext: SQLContext): 
mutable.Buffer[String] = {
    +    import sqlContext.implicits._
    +    val asList =
    +      sqlContext.read.schema(AkkaStreamConstants.SCHEMA_DEFAULT)
    +      .parquet(s"$tmpDir/parquet/t.parquet").as[(String, 
Timestamp)].map(_._1)
    +      .collectAsList().asScala
    +    asList
    +  }
    +
    +  test("basic usage") {
    +    val message = "Akka is a reactive framework"
    +
    +    akkaTestUtils.setMessage(message)
    +    akkaTestUtils.setCountOfMessages(1)
    +
    +    val (sqlContext: SQLContext, dataFrame: DataFrame) = 
createStreamingDataframe()
    +
    +    writeStreamResults(sqlContext, dataFrame, 10000)
    +
    +    val resultBuffer: mutable.Buffer[String] = 
readBackSreamingResults(sqlContext)
    +
    +    assert(resultBuffer.size === 1)
    +    assert(resultBuffer.head === message)
    +  }
    +
    +  test("Send and receive 100 messages.") {
    +    val message = "Akka is a reactive framework"
    +
    +    akkaTestUtils.setMessage(message)
    +    akkaTestUtils.setCountOfMessages(100)
    +
    +    val (sqlContext: SQLContext, dataFrame: DataFrame) = 
createStreamingDataframe()
    +
    +    writeStreamResults(sqlContext, dataFrame, 10000)
    +
    +    val resultBuffer: mutable.Buffer[String] = 
readBackSreamingResults(sqlContext)
    +
    +    assert(resultBuffer.size === 100)
    +    assert(resultBuffer.head === message)
    +  }
    +
    +  test("params not provided") {
    +    val persistenceDirPath = tempDir.getAbsolutePath + "/persistence"
    +
    +    val provider = new AkkaStreamSourceProvider
    +    val sqlContext: SQLContext = new SQLContext(sc)
    +
    --- End diff --
    
    Would you please change to use Sparksession instead of directly 
instantiating a SQLContext.


> Akka as a streaming source for SQL Streaming.
> ---------------------------------------------
>
>                 Key: BAHIR-97
>                 URL: https://issues.apache.org/jira/browse/BAHIR-97
>             Project: Bahir
>          Issue Type: New Feature
>          Components: Spark SQL Data Sources
>    Affects Versions: Spark-2.1.0
>            Reporter: Subhobrata Dey
>
> Hello,
> This issue is created to propose the addition of Akka compatible streaming 
> source for Spark SQL Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to