Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4517#discussion_r141341300 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -130,34 +128,31 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) byte msgId = msg.readByte(); - NettyMessage decodedMsg = null; - - if (msgId == BufferResponse.ID) { - decodedMsg = new BufferResponse(); - } - else if (msgId == PartitionRequest.ID) { - decodedMsg = new PartitionRequest(); - } - else if (msgId == TaskEventRequest.ID) { - decodedMsg = new TaskEventRequest(); - } - else if (msgId == ErrorResponse.ID) { - decodedMsg = new ErrorResponse(); - } - else if (msgId == CancelPartitionRequest.ID) { - decodedMsg = new CancelPartitionRequest(); - } - else if (msgId == CloseRequest.ID) { - decodedMsg = new CloseRequest(); - } - else { - throw new IllegalStateException("Received unknown message from producer: " + msg); + final NettyMessage decodedMsg; + switch (msgId) { + case BufferResponse.ID: + decodedMsg = BufferResponse.readFrom(msg); + break; + case PartitionRequest.ID: + decodedMsg = PartitionRequest.readFrom(msg); + break; + case TaskEventRequest.ID: + decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader()); + break; + case ErrorResponse.ID: + decodedMsg = ErrorResponse.readFrom(msg); + break; + case CancelPartitionRequest.ID: + decodedMsg = CancelPartitionRequest.readFrom(msg); + break; + case CloseRequest.ID: + decodedMsg = CloseRequest.readFrom(msg); + break; + default: + throw new IllegalStateException("Received unknown message from producer: " + msg); --- End diff -- I agree that `IllegalStateException` is not the best fit here, but also, `UnsupportedOperationException` is not - what do you think about `ProtocolException`?
---