Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4517#discussion_r141341300
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
    @@ -130,34 +128,31 @@ protected void decode(ChannelHandlerContext ctx, 
ByteBuf msg, List<Object> out)
     
                        byte msgId = msg.readByte();
     
    -                   NettyMessage decodedMsg = null;
    -
    -                   if (msgId == BufferResponse.ID) {
    -                           decodedMsg = new BufferResponse();
    -                   }
    -                   else if (msgId == PartitionRequest.ID) {
    -                           decodedMsg = new PartitionRequest();
    -                   }
    -                   else if (msgId == TaskEventRequest.ID) {
    -                           decodedMsg = new TaskEventRequest();
    -                   }
    -                   else if (msgId == ErrorResponse.ID) {
    -                           decodedMsg = new ErrorResponse();
    -                   }
    -                   else if (msgId == CancelPartitionRequest.ID) {
    -                           decodedMsg = new CancelPartitionRequest();
    -                   }
    -                   else if (msgId == CloseRequest.ID) {
    -                           decodedMsg = new CloseRequest();
    -                   }
    -                   else {
    -                           throw new IllegalStateException("Received 
unknown message from producer: " + msg);
    +                   final NettyMessage decodedMsg;
    +                   switch (msgId) {
    +                           case BufferResponse.ID:
    +                                   decodedMsg = 
BufferResponse.readFrom(msg);
    +                                   break;
    +                           case PartitionRequest.ID:
    +                                   decodedMsg = 
PartitionRequest.readFrom(msg);
    +                                   break;
    +                           case TaskEventRequest.ID:
    +                                   decodedMsg = 
TaskEventRequest.readFrom(msg, getClass().getClassLoader());
    +                                   break;
    +                           case ErrorResponse.ID:
    +                                   decodedMsg = 
ErrorResponse.readFrom(msg);
    +                                   break;
    +                           case CancelPartitionRequest.ID:
    +                                   decodedMsg = 
CancelPartitionRequest.readFrom(msg);
    +                                   break;
    +                           case CloseRequest.ID:
    +                                   decodedMsg = CloseRequest.readFrom(msg);
    +                                   break;
    +                           default:
    +                                   throw new 
IllegalStateException("Received unknown message from producer: " + msg);
    --- End diff --
    
    I agree that `IllegalStateException` is not the best fit here, but also, 
`UnsupportedOperationException` is not - what do you think about 
`ProtocolException`?


---

Reply via email to