## What is the purpose of the change Currently, only append stream can be ingested as a stream table. This pull request implement proctime DataStream to Table upsert conversion.
Api looks like: ``` DataStream[(Boolean, (String, Long, Int))] input = ??? // upsert with keyedTable table = tEnv.fromUpsertStream(input, 'a, 'b, 'c.key) // upsert without key -> single row tableTable table = tEnv.fromUpsertFromStream(input, 'a, 'b, 'c) ``` A simple design doc can be fond [here](https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing). ## Brief change log - Add `fromUpsertStream()` and `fromAppendStream()` in java and scala `StreamTableEnvironment` and deprecate `fromDataStream`. - Add key support in the source definition including parse `key` keyword. - Add `DataStreamLastRowRule` and `DataStreamLastRowAfterCalcRule`. Both rules generate `DataStreamLastRow` to handle upsert stream. The differences between the two rule is `DataStreamLastRowAfterCalcRule` will take calc into consideration and generate LastRow DataStreamRel node after calc. This can decrease state size in LastRow. - Add `LastRowProcessFunction` to handle upsert messages and generate retractions if there is an update. ## Verifying this change This change added tests and can be verified as follows: - Add java api test in `JavaSqlITCase` - Add IT test cases in `FromUpsertStreamITCase` - Add sql plan tests in `FromUpsertStreamTest` - Add key extract in `UpdatingPlanCheckerTest` - Add validation test in `StreamTableEnvironmentValidationTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented, documents will be added in the later pr) [ Full content available at: https://github.com/apache/flink/pull/6787 ] This message was relayed via gitbox.apache.org for [email protected]
