The reason why output randomly is sink tasks have not been chained with the 
sort task. We can correct the `emitDataStream` in `UnsafeMemoryAppendTableSink` 
with:
```
    override def emitDataStream(dataStream: DataStream[Row]): Unit = {
      val inputParallelism = dataStream.getParallelism
      dataStream
        .addSink(new MemoryAppendSink)
        .setParallelism(inputParallelism)
        .name(TableConnectorUtil.generateRuntimeName(this.getClass, 
getFieldNames))
    }
```
Once we correct it, remove `sorted` in `assertEquals `.

[ Full content available at: https://github.com/apache/flink/pull/6648 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to