Thanks very much! Regards, Ghassan
> On Jan 5, 2018, at 7:55 AM, Hequn Cheng <chenghe...@gmail.com> wrote: > > hi Ghassan, > > TableSource in Flink doesn't support primary key now, but you can achieve > it by doing a group by manually. > > Such as: > val resultTable = tableEnv.sql( > " > SELECT ProductID, AVG(Rating) FROM > ( SELECT ReviewID, LAST_VALUE(ProductID), > LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY ReviewID) > WHERE Approved = true GROUP BY ProductID > " > ) > > You have to implement the LAST_VALUE AggregateFunction. For implementation, > you can refer to the MAX AggregateFunction(MAX always return the max value > while LAST_VALUE always return the latest value). Also, you can find > documents about Aggregate Functions here: https://ci.apache.org/ > projects/flink/flink-docs-release-1.4/dev/table/udfs. > html#aggregation-functions > > Best, Hequn > > > 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Ghassan, >> >> Flink's Table API / SQL does not support the upsert ingestion mode >> (updating table rows by key) yet but only append mode, i.e, each event of >> the data stream is appended to the table. >> Hence, it is not possible to implement your use case using SQL. >> >> An upsert ingestion mode will be added in future version of Flink. >> >> Best, Fabian >> >> 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <ghassan.yamm...@bazaarvoice.com >>> : >> >>> Hi Timo, >>> >>> Thanks for your quick reply. >>> >>> I understand your statement about the SQL expression: It is grouped by >>> ProductID and performs the average computation over all the records it >>> sees. There is no concept of primary key and INSERT/UPDATE. >>> >>> Is there a way to do what I want using SQL without, or with, the >>> pre-processing of the stream before a SQL expression is executed? >>> >>> One thought I had was to transform the stream - prior to the SQL >>> expression - by doing something like morphing the value of the Rating to >> be >>> relative (instead of absolute) to the prior Rating, for the same review. >>> However, this would entail some review tracking mechanism. >>> >>> As I mentioned before, I’ve implemented this using the Datastream API - >>> internally maintaining a map of reviews and updating the average >>> calculation as each record is processed. Using SQL is very important to >>> us, and I’d like to see if I can make it work. >>> >>> Regarding the output sink, I previously looked into that and concluded >>> that I could not perform any “adjustment” to the average calculation >>> because of what the current SQL expression emits (i.e. incorrect average >>> calc.). Here is what the (retract) stream looks like post-SQL: >>> >>> (true,(product-100,1.0)) >>> (false,(product-100,1.0)) >>> (true,(product-100,2.5)) >>> (false,(product-100,2.5)) >>> (true,(product-100,3.6666666666666665)) >>> (false,(product-100,3.6666666666666665)) >>> (true,(product-100,3.5)). <———————— This is the correct value (per the >>> SQL) but not what I want it to be. >>> >>> Even if I change what the SQL query returns/emits so that the TableSink >>> can perform the average calculation, the latter will have to track prior >>> reviews in order to update its average calculation. If I’m correct, then >>> this is essentially no different than the Datastream API implementation >>> that I have. >>> >>> Agains, thanks for your quick response and help. >>> >>> Regards, >>> >>> Ghassan >>> >>> >>> >>> On Dec 20, 2017, at 11:50 AM, Timo Walther <twal...@apache.org<mailto: >> twa >>> l...@apache.org>> wrote: >>> >>> Hi Ghassan, >>> >>> in your example the result 3.5 is correct. The query is executed with >>> standard SQL semantics. You only group by ProductID and since it is the >>> same for all elements, the average is 3.5. >>> >>> The second "review-3" does not replace anything. In general, the >>> replacement would happen in the TableSink. The dynamic table performs >> view >>> maintenance. The TableSink materializes the result to some key-value >> store >>> or database. >>> >>> It might be worth to look into TableSinks [0] and the JavaDocs of the >>> mentioned classes. >>> >>> Feel free to ask further questions if necessary. >>> >>> Regards, >>> Timo >>> >>> [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >> dev/table/ >>> sourceSinks.html#define-a-tablesink >>> >>> >>> >>> Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine: >>> Hello, >>> >>> I’m knew to Flink and I need some help. I’d like to use the SQL API for >>> processing an incoming stream that has the following characteristics: >>> >>> * Each stream record has a key >>> * The record can be updated >>> * The record is of the form: reviewId -> (productId, rating) >>> >>> For the above stream, I want to compute the average rating for each >>> product ID. The key is the reviewId >>> With the SQL API, I get incorrect results. However, I’ve been able to >>> make it work through the use of RichFlatMapFunction and the Datastream >> API. >>> >>> Below is the entire code listing, which does not work. I know I’m >> missing >>> the definition/use of a primary key so that an update on the same key can >>> occur. >>> However, I’m not sure how to go about doing this. Any help/comments are >>> welcome. >>> >>> Thank you, >>> >>> Ghassan >>> >>> >>> package com.bazaarvoice.flink_poc >>> >>> import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId} >>> import org.apache.flink.api.common.time.Time >>> import org.apache.flink.streaming.api.TimeCharacteristic >>> import org.apache.flink.streaming.api.scala.{DataStream, >>> createTypeInformation, _} >>> import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} >>> import org.apache.flink.table.api.scala._ >>> >>> package object flink_poc{ >>> type ProductId = String >>> type ReviewId = String >>> } >>> >>> case class SummaryReview(reviewId: ReviewId, productId: ProductId, >>> approved: Boolean, rating: Double) extends Serializable { >>> override def toString: String = { >>> s"$reviewId, $productId, ${if (approved) "APPROVED" else >>> "REJECTED"}, $rating" >>> } >>> } >>> >>> object AverageRatingWithSQL { >>> >>> def main(args: Array[String]) { >>> >>> val events = List( >>> SummaryReview("review-1", "product-100", approved = true, 1), >>> SummaryReview("review-2", "product-100", approved = true, 4), >>> SummaryReview("review-3", "product-100", approved = true, 6), >>> SummaryReview("review-3", "product-100", approved = true, 3) // >>> <-- this should override the previous record >>> ).toSeq >>> // Average rating should be equal to (1+4+3)/3 = 2.666667 >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >>> >>> val inputStream: DataStream[SummaryReview] = >> env.fromCollection(events) >>> >>> val tableEnv = TableEnvironment.getTableEnvironment(env) >>> >>> tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID, >>> 'ProductID, 'Approved, 'Rating) >>> >>> val resultTable = tableEnv.sql( >>> "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved = true >>> GROUP BY ProductID" >>> ) >>> >>> val typeInfo = createTypeInformation[(ProductId, Double)] >>> val outStream = resultTable.toRetractStream(typeInfo) >>> >>> outStream.print() >>> >>> env.execute("Flink SQL Average rating") >>> >>> } >>> } >>> >>> >>> >>> >>> >>> >>> >>