The reason why output randomly is sink tasks have not been chained with the
sort task. We can correct the `emitDataStream` in `UnsafeMemoryAppendTableSink`
with:
```
override def emitDataStream(dataStream: DataStream[Row]): Unit = {
val inputParallelism = dataStream.getParallelism
dataStream
.addSink(new MemoryAppendSink)
.setParallelism(inputParallelism)
.name(TableConnectorUtil.generateRuntimeName(this.getClass,
getFieldNames))
}
```
Once we correct it, remove `sorted` in `assertEquals `.
[ Full content available at: https://github.com/apache/flink/pull/6648 ]
This message was relayed via gitbox.apache.org for [email protected]