Also, if you expect your the key update records to be out of order, you might want to add a ProcessFunction on a keyed stream that filters records with smaller timestamps than the highest observed timestamp. This would prevent a record to be overridden by an earlier version with a smaller timestamp.
2018-01-05 15:05 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > Ha, that's a neat workaround! Thanks for sharing Hequn! > > When doing this, you should however, ensure that all records with the same > key arrive from the same input task to avoid inconsistent behavior due to > records arriving out of order. > This would be the case if you ingest the table directly from a Kafka topic > that is partitioned by key. > > Best, Fabian > > 2018-01-05 14:55 GMT+01:00 Hequn Cheng <chenghe...@gmail.com>: > >> hi Ghassan, >> >> TableSource in Flink doesn't support primary key now, but you can achieve >> it by doing a group by manually. >> >> Such as: >> val resultTable = tableEnv.sql( >> " >> SELECT ProductID, AVG(Rating) FROM >> ( SELECT ReviewID, LAST_VALUE(ProductID), >> LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY ReviewID) >> WHERE Approved = true GROUP BY ProductID >> " >> ) >> >> You have to implement the LAST_VALUE AggregateFunction. For >> implementation, >> you can refer to the MAX AggregateFunction(MAX always return the max value >> while LAST_VALUE always return the latest value). Also, you can find >> documents about Aggregate Functions here: https://ci.apache.org/ >> projects/flink/flink-docs-release-1.4/dev/table/udfs. >> html#aggregation-functions >> >> Best, Hequn >> >> >> 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fhue...@gmail.com>: >> >> > Hi Ghassan, >> > >> > Flink's Table API / SQL does not support the upsert ingestion mode >> > (updating table rows by key) yet but only append mode, i.e, each event >> of >> > the data stream is appended to the table. >> > Hence, it is not possible to implement your use case using SQL. >> > >> > An upsert ingestion mode will be added in future version of Flink. >> > >> > Best, Fabian >> > >> > 2017-12-21 5:20 GMT+01:00 Ghassan Yammine < >> ghassan.yamm...@bazaarvoice.com >> > >: >> > >> > > Hi Timo, >> > > >> > > Thanks for your quick reply. >> > > >> > > I understand your statement about the SQL expression: It is grouped >> by >> > > ProductID and performs the average computation over all the records it >> > > sees. There is no concept of primary key and INSERT/UPDATE. >> > > >> > > Is there a way to do what I want using SQL without, or with, the >> > > pre-processing of the stream before a SQL expression is executed? >> > > >> > > One thought I had was to transform the stream - prior to the SQL >> > > expression - by doing something like morphing the value of the Rating >> to >> > be >> > > relative (instead of absolute) to the prior Rating, for the same >> review. >> > > However, this would entail some review tracking mechanism. >> > > >> > > As I mentioned before, I’ve implemented this using the Datastream API >> - >> > > internally maintaining a map of reviews and updating the average >> > > calculation as each record is processed. Using SQL is very important >> to >> > > us, and I’d like to see if I can make it work. >> > > >> > > Regarding the output sink, I previously looked into that and concluded >> > > that I could not perform any “adjustment” to the average calculation >> > > because of what the current SQL expression emits (i.e. incorrect >> average >> > > calc.). Here is what the (retract) stream looks like post-SQL: >> > > >> > > (true,(product-100,1.0)) >> > > (false,(product-100,1.0)) >> > > (true,(product-100,2.5)) >> > > (false,(product-100,2.5)) >> > > (true,(product-100,3.6666666666666665)) >> > > (false,(product-100,3.6666666666666665)) >> > > (true,(product-100,3.5)). <———————— This is the correct value (per >> the >> > > SQL) but not what I want it to be. >> > > >> > > Even if I change what the SQL query returns/emits so that the >> TableSink >> > > can perform the average calculation, the latter will have to track >> prior >> > > reviews in order to update its average calculation. If I’m correct, >> then >> > > this is essentially no different than the Datastream API >> implementation >> > > that I have. >> > > >> > > Agains, thanks for your quick response and help. >> > > >> > > Regards, >> > > >> > > Ghassan >> > > >> > > >> > > >> > > On Dec 20, 2017, at 11:50 AM, Timo Walther <twal...@apache.org >> <mailto: >> > twa >> > > l...@apache.org>> wrote: >> > > >> > > Hi Ghassan, >> > > >> > > in your example the result 3.5 is correct. The query is executed with >> > > standard SQL semantics. You only group by ProductID and since it is >> the >> > > same for all elements, the average is 3.5. >> > > >> > > The second "review-3" does not replace anything. In general, the >> > > replacement would happen in the TableSink. The dynamic table performs >> > view >> > > maintenance. The TableSink materializes the result to some key-value >> > store >> > > or database. >> > > >> > > It might be worth to look into TableSinks [0] and the JavaDocs of the >> > > mentioned classes. >> > > >> > > Feel free to ask further questions if necessary. >> > > >> > > Regards, >> > > Timo >> > > >> > > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >> > dev/table/ >> > > sourceSinks.html#define-a-tablesink >> > > >> > > >> > > >> > > Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine: >> > > Hello, >> > > >> > > I’m knew to Flink and I need some help. I’d like to use the SQL API >> for >> > > processing an incoming stream that has the following characteristics: >> > > >> > > * Each stream record has a key >> > > * The record can be updated >> > > * The record is of the form: reviewId -> (productId, rating) >> > > >> > > For the above stream, I want to compute the average rating for each >> > > product ID. The key is the reviewId >> > > With the SQL API, I get incorrect results. However, I’ve been able to >> > > make it work through the use of RichFlatMapFunction and the Datastream >> > API. >> > > >> > > Below is the entire code listing, which does not work. I know I’m >> > missing >> > > the definition/use of a primary key so that an update on the same key >> can >> > > occur. >> > > However, I’m not sure how to go about doing this. Any help/comments >> are >> > > welcome. >> > > >> > > Thank you, >> > > >> > > Ghassan >> > > >> > > >> > > package com.bazaarvoice.flink_poc >> > > >> > > import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId} >> > > import org.apache.flink.api.common.time.Time >> > > import org.apache.flink.streaming.api.TimeCharacteristic >> > > import org.apache.flink.streaming.api.scala.{DataStream, >> > > createTypeInformation, _} >> > > import org.apache.flink.table.api.{StreamQueryConfig, >> TableEnvironment} >> > > import org.apache.flink.table.api.scala._ >> > > >> > > package object flink_poc{ >> > > type ProductId = String >> > > type ReviewId = String >> > > } >> > > >> > > case class SummaryReview(reviewId: ReviewId, productId: ProductId, >> > > approved: Boolean, rating: Double) extends Serializable { >> > > override def toString: String = { >> > > s"$reviewId, $productId, ${if (approved) "APPROVED" else >> > > "REJECTED"}, $rating" >> > > } >> > > } >> > > >> > > object AverageRatingWithSQL { >> > > >> > > def main(args: Array[String]) { >> > > >> > > val events = List( >> > > SummaryReview("review-1", "product-100", approved = true, 1), >> > > SummaryReview("review-2", "product-100", approved = true, 4), >> > > SummaryReview("review-3", "product-100", approved = true, 6), >> > > SummaryReview("review-3", "product-100", approved = true, 3) >> // >> > > <-- this should override the previous record >> > > ).toSeq >> > > // Average rating should be equal to (1+4+3)/3 = 2.666667 >> > > >> > > val env = StreamExecutionEnvironment.getExecutionEnvironment >> > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >> > > >> > > val inputStream: DataStream[SummaryReview] = >> > env.fromCollection(events) >> > > >> > > val tableEnv = TableEnvironment.getTableEnvironment(env) >> > > >> > > tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, >> > > 'ProductID, 'Approved, 'Rating) >> > > >> > > val resultTable = tableEnv.sql( >> > > "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = >> true >> > > GROUP BY ProductID" >> > > ) >> > > >> > > val typeInfo = createTypeInformation[(ProductId, Double)] >> > > val outStream = resultTable.toRetractStream(typeInfo) >> > > >> > > outStream.print() >> > > >> > > env.execute("Flink SQL Average rating") >> > > >> > > } >> > > } >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > >> > >