Github user suez1224 commented on the issue:
https://github.com/apache/flink/pull/6201
@fhueske @twalthr thanks for the comments. In `from-source`, the only
system i know of is Kafka10 or Kafka11, which support writing record along with
timestamp. To support `from-source` in table sink, I think we can do the
following:
1) add a connector property, e.g. connector.support-timestamp. Only if
connector.support-timestamp is true, we will allow the sink table schema to
contain a field with rowtime type `from-source`. Otherwise, an exception will
be thrown.
2) if the condition in 1) is satisfied, we will create corresponding
rowtime field in the sink table schema with type LONG, in
TableEnvironment.insertInto(), we will validate the sink schema against the
insertion source. Also, in the TableSink.emitDataStream() implementation, we
will need to insert an timestamp assigner operator to set
StreamRecord.timestamp (should we reuse existing interface, or create a new
timestampInserter interface?) and remove the extra rowtime field from
StreamRecord.value before we emit the datastream to the sink. (for
kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true))
Please correct me if I missed something here. What do you think?
---