## What is the purpose of the change

Currently, only append stream can be ingested as a stream table. This pull 
request implement proctime DataStream to Table upsert conversion.

Api looks like:

```
DataStream[(Boolean, (String, Long, Int))] input = ???
// upsert with keyedTable 
table = tEnv.fromUpsertStream(input, 'a, 'b, 'c.key)
// upsert without key -> single row tableTable 
table = tEnv.fromUpsertFromStream(input, 'a, 'b, 'c) 
```

A simple design doc can be fond 
[here](https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing).


## Brief change log

  - Add `fromUpsertStream()` and `fromAppendStream()` in  java and scala 
`StreamTableEnvironment` and deprecate `fromDataStream`.
  - Add key support in the source definition including parse `key` keyword. 
  - Add `DataStreamLastRowRule` and `DataStreamLastRowAfterCalcRule`. Both 
rules generate `DataStreamLastRow` to handle upsert stream. The differences 
between the two rule is `DataStreamLastRowAfterCalcRule` will take calc into 
consideration and generate LastRow DataStreamRel node after calc. This can 
decrease state size in LastRow.
  - Add `LastRowProcessFunction` to handle upsert messages and generate 
retractions if there is an update.


## Verifying this change

This change added tests and can be verified as follows:

  - Add java api test in `JavaSqlITCase`
  - Add IT test cases in `FromUpsertStreamITCase`
  - Add sql plan tests in `FromUpsertStreamTest`
  - Add key extract in `UpdatingPlanCheckerTest`
  - Add validation test in `StreamTableEnvironmentValidationTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not documented, documents will be 
added in the later pr)


[ Full content available at: https://github.com/apache/flink/pull/6787 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to