Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r184995557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -230,8 +230,12 @@ abstract class StreamTableEnvironment( tableKeys match { case Some(keys) => upsertSink.setKeyFields(keys) case None if isAppendOnlyTable => upsertSink.setKeyFields(null) - case None if !isAppendOnlyTable => throw new TableException( - "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") + case None if !isAppendOnlyTable && upsertSink.enforceKeyFields() == null => --- End diff -- OK.
---