Hello,

I’m knew to Flink and I need some help.  I’d like to use the SQL API for 
processing an incoming stream that has the following characteristics:

  *   Each stream record has a key
  *   The record can be updated
  *   The record is of the form: reviewId -> (productId, rating)

For the above stream, I want to compute the average rating for each product ID. 
 The key is the reviewId
With the SQL API, I get incorrect results.  However, I’ve been able to make it 
work through the use of RichFlatMapFunction and the Datastream API.

Below is the entire code listing, which does not work.  I know I’m missing the 
definition/use of a primary key so that an update on the same key can occur.
However, I’m not sure how to go about doing this.  Any help/comments are 
welcome.

Thank you,

Ghassan


package com.bazaarvoice.flink_poc

import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, createTypeInformation, 
_}
import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.table.api.scala._

package object flink_poc{
  type ProductId = String
  type ReviewId = String
}

case class SummaryReview(reviewId: ReviewId, productId: ProductId, approved: 
Boolean, rating: Double) extends Serializable {
  override def toString: String = {
    s"$reviewId,  $productId,  ${if (approved) "APPROVED" else "REJECTED"},  
$rating"
  }
}

object AverageRatingWithSQL {

  def main(args: Array[String]) {

    val events = List(
      SummaryReview("review-1", "product-100", approved = true, 1),
      SummaryReview("review-2", "product-100", approved = true, 4),
      SummaryReview("review-3", "product-100", approved = true, 6),
      SummaryReview("review-3", "product-100", approved = true, 3)    // <-- 
this should override the previous record
    ).toSeq
    // Average rating should be equal to (1+4+3)/3 = 2.666667

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

    val inputStream: DataStream[SummaryReview] = env.fromCollection(events)

    val tableEnv = TableEnvironment.getTableEnvironment(env)

    tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, 'ProductID, 
'Approved, 'Rating)

    val resultTable = tableEnv.sql(
      "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true GROUP 
BY ProductID"
    )

    val typeInfo = createTypeInformation[(ProductId, Double)]
    val outStream = resultTable.toRetractStream(typeInfo)

    outStream.print()

    env.execute("Flink SQL Average rating")

  }
}




Reply via email to