Also, if you expect your the key update records to be out of order, you
might want to add a ProcessFunction on a keyed stream that filters records
with smaller timestamps than the highest observed timestamp.
This would prevent a record to be overridden by an earlier version with a
smaller timestamp.

2018-01-05 15:05 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Ha, that's a neat workaround! Thanks for sharing Hequn!
>
> When doing this, you should however, ensure that all records with the same
> key arrive from the same input task to avoid inconsistent behavior due to
> records arriving out of order.
> This would be the case if you ingest the table directly from a Kafka topic
> that is partitioned by key.
>
> Best, Fabian
>
> 2018-01-05 14:55 GMT+01:00 Hequn Cheng <chenghe...@gmail.com>:
>
>> hi Ghassan,
>>
>> TableSource in Flink doesn't support primary key now, but you can achieve
>> it by doing a group by manually.
>>
>> Such as:
>> val resultTable = tableEnv.sql(
>>       "
>>            SELECT ProductID, AVG(Rating) FROM
>>                  ( SELECT ReviewID, LAST_VALUE(ProductID),
>> LAST_VALUE(Approved) , LAST_VALUE(Rating) FROM Reviews GROUP BY  ReviewID)
>>            WHERE Approved = true GROUP BY ProductID
>>         "
>>     )
>>
>> You have to implement the LAST_VALUE AggregateFunction. For
>> implementation,
>> you can refer to the MAX AggregateFunction(MAX always return the max value
>> while LAST_VALUE always return the latest value). Also, you can find
>> documents about Aggregate Functions here: https://ci.apache.org/
>> projects/flink/flink-docs-release-1.4/dev/table/udfs.
>> html#aggregation-functions
>>
>> Best, Hequn
>>
>>
>> 2018-01-05 18:31 GMT+08:00 Fabian Hueske <fhue...@gmail.com>:
>>
>> > Hi Ghassan,
>> >
>> > Flink's Table API / SQL does not support the upsert ingestion mode
>> > (updating table rows by key) yet but only append mode, i.e, each event
>> of
>> > the data stream is appended to the table.
>> > Hence, it is not possible to implement your use case using SQL.
>> >
>> > An upsert ingestion mode will be added in future version of Flink.
>> >
>> > Best, Fabian
>> >
>> > 2017-12-21 5:20 GMT+01:00 Ghassan Yammine <
>> ghassan.yamm...@bazaarvoice.com
>> > >:
>> >
>> > > Hi Timo,
>> > >
>> > > Thanks for your quick reply.
>> > >
>> > > I understand your statement about the SQL expression:  It is grouped
>> by
>> > > ProductID and performs the average computation over all the records it
>> > > sees.  There is no concept of primary key and INSERT/UPDATE.
>> > >
>> > > Is there a way to do what I want using SQL without, or with, the
>> > > pre-processing of the stream before a SQL expression is executed?
>> > >
>> > > One thought I had was to transform the stream - prior to the SQL
>> > > expression - by doing something like morphing the value of the Rating
>> to
>> > be
>> > > relative (instead of absolute) to the prior Rating, for the same
>> review.
>> > > However, this would entail some review tracking mechanism.
>> > >
>> > > As I mentioned before, I’ve implemented this using the Datastream API
>> -
>> > > internally maintaining a map of reviews and updating the average
>> > > calculation as each record is processed.  Using SQL is very important
>> to
>> > > us, and I’d like to see if I can make it work.
>> > >
>> > > Regarding the output sink, I previously looked into that and concluded
>> > > that I could not perform any “adjustment” to the average calculation
>> > > because of what the current SQL expression emits (i.e. incorrect
>> average
>> > > calc.).  Here is what the (retract) stream looks like post-SQL:
>> > >
>> > > (true,(product-100,1.0))
>> > > (false,(product-100,1.0))
>> > > (true,(product-100,2.5))
>> > > (false,(product-100,2.5))
>> > > (true,(product-100,3.6666666666666665))
>> > > (false,(product-100,3.6666666666666665))
>> > > (true,(product-100,3.5)).   <———————— This is the correct value (per
>> the
>> > > SQL) but not what I want it to be.
>> > >
>> > > Even if I change what the SQL query returns/emits so that the
>> TableSink
>> > > can perform the average calculation, the latter will have to track
>> prior
>> > > reviews in order to update its average calculation.  If I’m correct,
>> then
>> > > this is essentially no different than the Datastream API
>> implementation
>> > > that I have.
>> > >
>> > > Agains, thanks for your quick response and help.
>> > >
>> > > Regards,
>> > >
>> > > Ghassan
>> > >
>> > >
>> > >
>> > > On Dec 20, 2017, at 11:50 AM, Timo Walther <twal...@apache.org
>> <mailto:
>> > twa
>> > > l...@apache.org>> wrote:
>> > >
>> > > Hi Ghassan,
>> > >
>> > > in your example the result 3.5 is correct. The query is executed with
>> > > standard SQL semantics. You only group by ProductID and since it is
>> the
>> > > same for all elements, the average is 3.5.
>> > >
>> > > The second "review-3" does not replace anything. In general, the
>> > > replacement would happen in the TableSink. The dynamic table performs
>> > view
>> > > maintenance. The TableSink materializes the result to some key-value
>> > store
>> > > or database.
>> > >
>> > > It might be worth to look into TableSinks [0] and the JavaDocs of the
>> > > mentioned classes.
>> > >
>> > > Feel free to ask further questions if necessary.
>> > >
>> > > Regards,
>> > > Timo
>> > >
>> > > [0] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> > dev/table/
>> > > sourceSinks.html#define-a-tablesink
>> > >
>> > >
>> > >
>> > > Am 12/19/17 um 9:08 PM schrieb Ghassan Yammine:
>> > > Hello,
>> > >
>> > > I’m knew to Flink and I need some help.  I’d like to use the SQL API
>> for
>> > > processing an incoming stream that has the following characteristics:
>> > >
>> > >   *   Each stream record has a key
>> > >   *   The record can be updated
>> > >   *   The record is of the form: reviewId -> (productId, rating)
>> > >
>> > > For the above stream, I want to compute the average rating for each
>> > > product ID.  The key is the reviewId
>> > > With the SQL API, I get incorrect results.  However, I’ve been able to
>> > > make it work through the use of RichFlatMapFunction and the Datastream
>> > API.
>> > >
>> > > Below is the entire code listing, which does not work.  I know I’m
>> > missing
>> > > the definition/use of a primary key so that an update on the same key
>> can
>> > > occur.
>> > > However, I’m not sure how to go about doing this.  Any help/comments
>> are
>> > > welcome.
>> > >
>> > > Thank you,
>> > >
>> > > Ghassan
>> > >
>> > >
>> > > package com.bazaarvoice.flink_poc
>> > >
>> > > import com.bazaarvoice.flink_poc.flink_poc.{ProductId, ReviewId}
>> > > import org.apache.flink.api.common.time.Time
>> > > import org.apache.flink.streaming.api.TimeCharacteristic
>> > > import org.apache.flink.streaming.api.scala.{DataStream,
>> > > createTypeInformation, _}
>> > > import org.apache.flink.table.api.{StreamQueryConfig,
>> TableEnvironment}
>> > > import org.apache.flink.table.api.scala._
>> > >
>> > > package object flink_poc{
>> > >   type ProductId = String
>> > >   type ReviewId = String
>> > > }
>> > >
>> > > case class SummaryReview(reviewId: ReviewId, productId: ProductId,
>> > > approved: Boolean, rating: Double) extends Serializable {
>> > >   override def toString: String = {
>> > >     s"$reviewId,  $productId,  ${if (approved) "APPROVED" else
>> > > "REJECTED"},  $rating"
>> > >   }
>> > > }
>> > >
>> > > object AverageRatingWithSQL {
>> > >
>> > >   def main(args: Array[String]) {
>> > >
>> > >     val events = List(
>> > >       SummaryReview("review-1", "product-100", approved = true, 1),
>> > >       SummaryReview("review-2", "product-100", approved = true, 4),
>> > >       SummaryReview("review-3", "product-100", approved = true, 6),
>> > >       SummaryReview("review-3", "product-100", approved = true, 3)
>> //
>> > > <-- this should override the previous record
>> > >     ).toSeq
>> > >     // Average rating should be equal to (1+4+3)/3 = 2.666667
>> > >
>> > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
>> > >     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> > >
>> > >     val inputStream: DataStream[SummaryReview] =
>> > env.fromCollection(events)
>> > >
>> > >     val tableEnv = TableEnvironment.getTableEnvironment(env)
>> > >
>> > >     tableEnv.registerDataStream("Reviews", inputStream, 'ReviewID,
>> > > 'ProductID, 'Approved, 'Rating)
>> > >
>> > >     val resultTable = tableEnv.sql(
>> > >       "SELECT ProductID, AVG(Rating) FROM Reviews WHERE Approved =
>> true
>> > > GROUP BY ProductID"
>> > >     )
>> > >
>> > >     val typeInfo = createTypeInformation[(ProductId, Double)]
>> > >     val outStream = resultTable.toRetractStream(typeInfo)
>> > >
>> > >     outStream.print()
>> > >
>> > >     env.execute("Flink SQL Average rating")
>> > >
>> > >   }
>> > > }
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> >
>>
>
>

Reply via email to