Thank you very much for your response, I did ask a pro for help and below
was the sample code on sample SSB project I would like to contribute to
help someone who have same issue like me:

==============================


import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.json4s.jackson.JsonMethods
import org.json4s.{DefaultFormats, Formats}

import java.io.{BufferedReader, DataOutputStream, InputStreamReader}
import java.net.{HttpURLConnection, URL}
import java.util.Base64

object APIKylinRunSQL {

  val KYLIN_QUERY_URL = "http://localhost:7070/kylin/api/query";
  val USER_NAME = "xxxxx"
  val PASSWORD = "yyyyy"
  val KYLIN_PROJECT = "learn_kylin"

  val spark = SparkSession.builder
    .master("local")
    .appName("Convert JSON to DataFrame")
    .getOrCreate()

  def main(args: Array[String]): Unit = {


    val tablesAndQueries = Map(
      "CUSTOMER" -> "select * from SSB.CUSTOMER",
      "DATES" -> "SELECT * FROM SSB.DATES",
      "PART" -> "SELECT * FROM SSB.PART",
      "P_LINEORDER" -> "SELECT * FROM SSB.P_LINEORDER",
      "SUPPLIER" -> "SELECT * FROM SSB.SUPPLIER",
      "P_LINEORDER" -> "SELECT lo_orderdate, count(1) FROM SSB.P_LINEORDER
GROUP BY lo_orderdate",
      "PART" -> "SELECT P_COLOR, count(1) FROM SSB.PART group by P_COLOR"
    )

    // query times
    val numberOfExecutions = 15

    // loop query
    for (i <- 1 to numberOfExecutions) {
      println(s"Executing query $i")
      for ((table, query) <- tablesAndQueries) {
        println(s"Executing queries for table $table")

        println(query)

        executeQuery(query)
        // wait a seconds
        Thread.sleep(1000)
      }
    }

  }

  def executeQuery(sqlQuery: String): Unit = {

    val queryJson =
      s"""
         |{
         |  "project": "$KYLIN_PROJECT",
         |  "sql": "$sqlQuery"
         |}
         |""".stripMargin

    // Encode the username and password for basic authentication
    val encodedAuth =
Base64.getEncoder.encodeToString(s"$USER_NAME:$PASSWORD".getBytes)

    val url = new URL(KYLIN_QUERY_URL)
    val connection = url.openConnection.asInstanceOf[HttpURLConnection]

    connection.setRequestMethod("POST")
    connection.setRequestProperty("Authorization", s"Basic $encodedAuth")
    connection.setRequestProperty("Content-Type", "application/json")
    connection.setRequestProperty("Accept", "application/json")
    connection.setDoOutput(true)

    val outputStream = connection.getOutputStream
    val writer = new DataOutputStream(outputStream)
    writer.write(queryJson.getBytes("UTF-8"))
    writer.flush()
    writer.close()

    val responseCode = connection.getResponseCode

    if (responseCode == HttpURLConnection.HTTP_OK) {
      val inputStream = connection.getInputStream
      val reader = new BufferedReader(new InputStreamReader(inputStream))
      var inputLine: String = null
      val response = new StringBuilder

      while ( {
        inputLine = reader.readLine;
        inputLine != null
      }) {
        response.append(inputLine)
      }
      reader.close()
      println("Result:")
      println(response.toString)

      connection.disconnect()

      // parse JSON
      implicit val formats: Formats = DefaultFormats
      val parsedJson = JsonMethods.parse(response.toString)

      val columns = (parsedJson \ "columnMetas")
        .extract[List[Map[String, Any]]]

      // dynamically build the schema based on column name information in
JSON
      val schema = StructType(columns.map { col =>
        val columnName = col("name").asInstanceOf[String]
        StructField(columnName, StringType, nullable = true)
      })

      schema.printTreeString()

      // get data from JSON
      val data = (parsedJson \ "results").extract[List[List[Any]]]

      // convert data to RDD[Row]
      val rowsRDD = spark.sparkContext.parallelize(data.map(row =>
Row.fromSeq(row.map(_.asInstanceOf[AnyRef]))))

      val df = spark.createDataFrame(rowsRDD, schema)

      df.show(20, false)

    } else {
      println(s"Error: $responseCode")
      connection.disconnect()
    }
  }
}


On Sun, Mar 31, 2024 at 8:57 PM Lionel CL <whuca...@outlook.com> wrote:

> Hi Nam,
> You can refer to the spark docs
> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
>
> Regards,
> Lu Cao
>
> From: Nam Đỗ Duy <na...@vnpay.vn.INVALID>
> Date: Sunday, March 31, 2024 at 08:53
> To: dev <dev@kylin.apache.org>, u...@kylin.apache.org <
> u...@kylin.apache.org>
> Subject: Re: How to query the Cube via API and use the dataset for other
> purpose
> Dear Sirs/Madames
>
> Could anyone here help me to figureout the way to use scala to query an
> select SQL against kylin cube via API then turn that table result into a
> dataframe in scala for other purpose?
>
> Thank you so much for your time!
>
> Best regards
>
> On Fri, 29 Mar 2024 at 17:52 Nam Đỗ Duy <na...@vnpay.vn> wrote:
>
> > Hi Xiaoxiang,
> > Sir & Madames,
> >
> > I use the following code to query the cube via API but I cannot use the
> > result as a dataframe, could you suggest a way to do that because it is
> > very important for our project.
> >
> > Thanks and best regards
> >
> > ===================================
> >
> > import org.apache.spark.sql.{DataFrame, SparkSession}
> > import org.apache.spark.sql.functions._
> >
> > object APICaller {
> >   def main(args: Array[String]): Unit = {
> >     val spark = SparkSession.builder()
> >       .appName("APICaller")
> >       .master("local[*]")
> >       .getOrCreate()
> >
> >     import spark.implicits._
> >
> >     val username = "namdd"
> >     val password = "eer123"
> >     val urlString = "http://localhost:7070/kylin/api/query";
> >     val project = "learn_kylin"
> >     val query = "select count(*) from HIVE_DWH_STANDARD.factuserEvent"
> >
> >     val response: String = callAPI(urlString, username, password,
> project,
> > query)
> >
> >     // Convert response to DataFrame
> >     val df = spark.read.json(Seq(response).toDS())
> >
> >     // Show DataFrame
> >     df.show()
> >
> >     // Stop Spark session
> >     spark.stop()
> >   }
> >
> >   def callAPI(url: String, username: String, password: String, project:
> > String, query: String): String = {
> >     val encodedAuth =
> >
> java.util.Base64.getEncoder.encodeToString(s"$username:$password".getBytes)
> >
> >     val connection = scalaj.http.Http(url)
> >       .postData(s"""{"project": "$project", "sql": "$query"}""")
> >       .header("Content-Type", "application/json")
> >       .header("Accept", "application/json")
> >       .auth(username, password)
> >       .asString
> >
> >     if (connection.isError)
> >       throw new RuntimeException(s"Error calling API:
> ${connection.body}")
> >
> >     connection.body
> >   }
> > }
> >
> >
>

Reply via email to