Hello, I’m knew to Flink and I need some help. I’d like to use the SQL API for processing an incoming stream that has the following characteristics:
* Each stream record has a key * The record can be updated * The record is of the form: reviewId -> (productId, rating) For the above stream, I want to compute the average rating for each product ID. The key is the reviewId With the SQL API, I get incorrect results. However, I’ve been able to make it work through the use of RichFlatMapFunction and the Datastream API. Below is the entire code listing, which does not work. I know I’m missing the definition/use of a primary key so that an update on the same key can occur. However, I’m not sure how to go about doing this. Any help/comments are welcome. Thank you, Ghassan package com.bazaarvoice.flink_poc import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId} import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.{DataStream, createTypeInformation, _} import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} import org.apache.flink.table.api.scala._ package object flink_poc{ type ProductId = String type ReviewId = String } case class SummaryReview(reviewId: ReviewId, productId: ProductId, approved: Boolean, rating: Double) extends Serializable { override def toString: String = { s"$reviewId, $productId, ${if (approved) "APPROVED" else "REJECTED"}, $rating" } } object AverageRatingWithSQL { def main(args: Array[String]) { val events = List( SummaryReview("review-1", "product-100", approved = true, 1), SummaryReview("review-2", "product-100", approved = true, 4), SummaryReview("review-3", "product-100", approved = true, 6), SummaryReview("review-3", "product-100", approved = true, 3) // <-- this should override the previous record ).toSeq // Average rating should be equal to (1+4+3)/3 = 2.666667 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) val inputStream: DataStream[SummaryReview] = env.fromCollection(events) val tableEnv = TableEnvironment.getTableEnvironment(env) tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, 'ProductID, 'Approved, 'Rating) val resultTable = tableEnv.sql( "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true GROUP BY ProductID" ) val typeInfo = createTypeInformation[(ProductId, Double)] val outStream = resultTable.toRetractStream(typeInfo) outStream.print() env.execute("Flink SQL Average rating") } }