This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6710045013ea8236cee06f48bc0c73a029f93bfe Author: Roman Khachatryan <[email protected]> AuthorDate: Mon Nov 9 14:07:43 2020 +0100 [hotfix][network] Report channel index if failied to deserialize --- .../apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java index 5084b8f..850d2ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java @@ -142,7 +142,12 @@ public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> { while (true) { // get the stream element from the deserializer if (currentRecordDeserializer != null) { - DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); + DeserializationResult result; + try { + result = currentRecordDeserializer.getNextRecord(deserializationDelegate); + } catch (IOException e) { + throw new IOException(String.format("Can't get next record for channel %d", lastChannel), e); + } if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null;
