Hi Ghassan,
in your example the result 3.5 is correct. The query is executed with
standard SQL semantics. You only group by ProductID and since it is the
same for all elements, the average is 3.5.
The second "review-3" does not replace anything. In general, the
replacement would happen in the TableSink. The dynamic table performs
view maintenance. The TableSink materializes the result to some
key-value store or database.
It might be worth to look into TableSinks [0] and the JavaDocs of the
mentioned classes.
Feel free to ask further questions if necessary.
Regards,
Timo
[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sourceSinks.html#define-a-tablesink
Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
Hello,
I’m knew to Flink and I need some help. I’d like to use the SQL API for
processing an incoming stream that has the following characteristics:
* Each stream record has a key
* The record can be updated
* The record is of the form: reviewId -> (productId, rating)
For the above stream, I want to compute the average rating for each product ID.
The key is the reviewId
With the SQL API, I get incorrect results. However, I’ve been able to make it
work through the use of RichFlatMapFunction and the Datastream API.
Below is the entire code listing, which does not work. I know I’m missing the
definition/use of a primary key so that an update on the same key can occur.
However, I’m not sure how to go about doing this. Any help/comments are
welcome.
Thank you,
Ghassan
package com.bazaarvoice.flink_poc
import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, createTypeInformation,
_}
import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
import org.apache.flink.table.api.scala._
package object flink_poc{
type ProductId = String
type ReviewId = String
}
case class SummaryReview(reviewId: ReviewId, productId: ProductId, approved:
Boolean, rating: Double) extends Serializable {
override def toString: String = {
s"$reviewId, $productId, ${if (approved) "APPROVED" else "REJECTED"},
$rating"
}
}
object AverageRatingWithSQL {
def main(args: Array[String]) {
val events = List(
SummaryReview("review-1", "product-100", approved = true, 1),
SummaryReview("review-2", "product-100", approved = true, 4),
SummaryReview("review-3", "product-100", approved = true, 6),
SummaryReview("review-3", "product-100", approved = true, 3) // <--
this should override the previous record
).toSeq
// Average rating should be equal to (1+4+3)/3 = 2.666667
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val inputStream: DataStream[SummaryReview] = env.fromCollection(events)
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, 'ProductID,
'Approved, 'Rating)
val resultTable = tableEnv.sql(
"SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true GROUP BY
ProductID"
)
val typeInfo = createTypeInformation[(ProductId, Double)]
val outStream = resultTable.toRetractStream(typeInfo)
outStream.print()
env.execute("Flink SQL Average rating")
}
}