Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4517#discussion_r141341300
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
---
@@ -130,34 +128,31 @@ protected void decode(ChannelHandlerContext ctx,
ByteBuf msg, List<Object> out)
byte msgId = msg.readByte();
- NettyMessage decodedMsg = null;
-
- if (msgId == BufferResponse.ID) {
- decodedMsg = new BufferResponse();
- }
- else if (msgId == PartitionRequest.ID) {
- decodedMsg = new PartitionRequest();
- }
- else if (msgId == TaskEventRequest.ID) {
- decodedMsg = new TaskEventRequest();
- }
- else if (msgId == ErrorResponse.ID) {
- decodedMsg = new ErrorResponse();
- }
- else if (msgId == CancelPartitionRequest.ID) {
- decodedMsg = new CancelPartitionRequest();
- }
- else if (msgId == CloseRequest.ID) {
- decodedMsg = new CloseRequest();
- }
- else {
- throw new IllegalStateException("Received
unknown message from producer: " + msg);
+ final NettyMessage decodedMsg;
+ switch (msgId) {
+ case BufferResponse.ID:
+ decodedMsg =
BufferResponse.readFrom(msg);
+ break;
+ case PartitionRequest.ID:
+ decodedMsg =
PartitionRequest.readFrom(msg);
+ break;
+ case TaskEventRequest.ID:
+ decodedMsg =
TaskEventRequest.readFrom(msg, getClass().getClassLoader());
+ break;
+ case ErrorResponse.ID:
+ decodedMsg =
ErrorResponse.readFrom(msg);
+ break;
+ case CancelPartitionRequest.ID:
+ decodedMsg =
CancelPartitionRequest.readFrom(msg);
+ break;
+ case CloseRequest.ID:
+ decodedMsg = CloseRequest.readFrom(msg);
+ break;
+ default:
+ throw new
IllegalStateException("Received unknown message from producer: " + msg);
--- End diff --
I agree that `IllegalStateException` is not the best fit here, but also,
`UnsupportedOperationException` is not - what do you think about
`ProtocolException`?
---