Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/5588#discussion_r171001301
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
---
@@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer,
Class<?> eventClass, ClassLoad
try {
int type = buffer.getInt();
- switch (type) {
- case END_OF_PARTITION_EVENT:
- return
eventClass.equals(EndOfPartitionEvent.class);
- case CHECKPOINT_BARRIER_EVENT:
- return
eventClass.equals(CheckpointBarrier.class);
- case END_OF_SUPERSTEP_EVENT:
- return
eventClass.equals(EndOfSuperstepEvent.class);
- case CANCEL_CHECKPOINT_MARKER_EVENT:
- return
eventClass.equals(CancelCheckpointMarker.class);
- case OTHER_EVENT:
- try {
- final DataInputDeserializer
deserializer = new DataInputDeserializer(buffer);
- final String className =
deserializer.readUTF();
-
- final Class<? extends
AbstractEvent> clazz;
- try {
- clazz =
classLoader.loadClass(className).asSubclass(AbstractEvent.class);
- }
- catch (ClassNotFoundException
e) {
- throw new
IOException("Could not load event class '" + className + "'.", e);
- }
- catch (ClassCastException e) {
- throw new
IOException("The class '" + className + "' is not a valid subclass of '"
- +
AbstractEvent.class.getName() + "'.", e);
- }
- return eventClass.equals(clazz);
- }
- catch (Exception e) {
- throw new IOException("Error
while deserializing or instantiating event.", e);
- }
- default:
- throw new IOException("Corrupt byte
stream for event");
+ if (eventClass.equals(EndOfPartitionEvent.class)) {
+ return type == END_OF_PARTITION_EVENT;
+ } else if (eventClass.equals(CheckpointBarrier.class)) {
+ return type == CHECKPOINT_BARRIER_EVENT;
+ } else if
(eventClass.equals(EndOfSuperstepEvent.class)) {
+ return type == END_OF_SUPERSTEP_EVENT;
+ } else if
(eventClass.equals(CancelCheckpointMarker.class)) {
+ return type == CANCEL_CHECKPOINT_MARKER_EVENT;
+ } else {
+ throw new IOException("Corrupt byte stream for
event or unsupported eventClass = " + eventClass);
--- End diff --
Actually, this should be an `UnsupportedOperationException` since this is
only based on the class being given and not the input stream.
---