This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45c62cacc0addde2d1d87e35d695d885140bdaa8
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Feb 17 15:58:16 2026 +0100

    [FLINK-39103][runtime] Tolerate deserializer release failure if the 
corresponding channel already had errors
    
    On cleanup, deserializer recycles its buffers, potentially notifying the 
input channel.
    
    However, if the input channel has encountered an error (such as 
RemoteTransportException);
    then notification will fail which might cause the whol cleanup to fail
    and lead to TM shutdown.
---
 .../network/partition/consumer/InputChannel.java   |  5 ++
 .../runtime/io/AbstractStreamTaskNetworkInput.java | 23 ++++++-
 .../partition/consumer/InputChannelTest.java       | 10 +++
 .../consumer/StreamTestSingleInputGate.java        |  5 ++
 .../runtime/io/StreamTaskNetworkInputTest.java     | 75 ++++++++++++++++++++++
 5 files changed, 116 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 1d5efb19b90..e2969c675ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -346,6 +346,11 @@ public abstract class InputChannel {
      */
     public void notifyRequiredSegmentId(int subpartitionId, int segmentId) 
throws IOException {}
 
+    /** Whether this input channel has encountered error. */
+    public boolean hasError() {
+        return cause.get() != null;
+    }
+
     // ------------------------------------------------------------------------
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
index c6053248aff..a3743edec7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.event.WatermarkEvent;
 import org.apache.flink.runtime.io.network.api.EndOfData;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import 
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
@@ -39,6 +40,9 @@ import 
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.util.watermark.WatermarkUtils;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -59,6 +63,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 public abstract class AbstractStreamTaskNetworkInput<
                 T, R extends 
RecordDeserializer<DeserializationDelegate<StreamElement>>>
         implements StreamTaskInput<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractStreamTaskNetworkInput.class);
+
     protected final CheckpointedInputGate checkpointedInputGate;
     protected final DeserializationDelegate<StreamElement> 
deserializationDelegate;
     protected final TypeSerializer<T> inputSerializer;
@@ -315,13 +322,25 @@ public abstract class AbstractStreamTaskNetworkInput<
 
     @Override
     public void close() throws IOException {
-        // release the deserializers . this part should not ever fail
+        // WARNING: throwing an exception from this method might fail Task 
closing procedure and
+        // terminate the TM
         Exception err = null;
         for (InputChannelInfo channelInfo : new 
ArrayList<>(recordDeserializers.keySet())) {
+            final boolean hadError =
+                    
checkpointedInputGate.getChannel(channelInfo.getInputChannelIdx()).hasError();
             try {
                 releaseDeserializer(channelInfo);
             } catch (Exception e) {
-                err = e;
+                if (hadError
+                        && ExceptionUtils.findThrowable(e, 
RemoteTransportException.class)
+                                .isPresent()) {
+                    LOG.warn(
+                            "Ignoring deserializer release failure - the 
channel {} has encountered a transport error before: {}",
+                            channelInfo,
+                            e.getMessage());
+                } else {
+                    err = ExceptionUtils.firstOrSuppressed(e, err);
+                }
             }
         }
         if (err != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 8a763057472..866636a3061 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -101,6 +101,16 @@ class InputChannelTest {
         assertThat(ch.getCurrentBackoff()).isZero();
     }
 
+    @Test
+    void testHasError() {
+        InputChannel ch = createInputChannel(0, 0);
+
+        assertThat(ch.hasError()).isFalse();
+
+        ch.setError(new RuntimeException("test error"));
+        assertThat(ch.hasError()).isTrue();
+    }
+
     private InputChannel createInputChannel(int initialBackoff, int 
maxBackoff) {
         return new MockInputChannel(
                 mock(SingleInputGate.class),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index cf7418d8780..29ad635d1e9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -205,6 +205,11 @@ public class StreamTestSingleInputGate<T> {
         }
     }
 
+    /** Sets an error on the specified channel. */
+    public void setChannelError(int channel, Throwable error) {
+        inputChannels[channel].setError(error);
+    }
+
     /** Returns true iff all input queues are empty. */
     public boolean allQueuesEmpty() {
         for (int i = 0; i < numInputChannels; i++) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index c9cc91b90a2..d69826e05be 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import 
org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
 import 
org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
@@ -64,7 +65,10 @@ import org.apache.flink.util.clock.SystemClock;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -78,10 +82,17 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link StreamTaskNetworkInput}. */
 class StreamTaskNetworkInputTest {
 
+    private static final RuntimeException WRAPPED_TRANSPORT_EXCEPTION =
+            new RuntimeException(
+                    new RemoteTransportException(
+                            "simulated release failure",
+                            InetSocketAddress.createUnresolved("localhost", 
8080)));
+
     private static final int PAGE_SIZE = 1000;
 
     private final IOManager ioManager = new IOManagerAsync();
@@ -473,6 +484,55 @@ class StreamTaskNetworkInputTest {
         }
     }
 
+    @Test
+    void testCloseIgnoresReleaseFailureFromChannelWithError() throws Exception 
{
+        testErrorHandlingOnClose(WRAPPED_TRANSPORT_EXCEPTION, 
WRAPPED_TRANSPORT_EXCEPTION, null);
+    }
+
+    @Test
+    void testCloseThrowsReleaseFailureFromChannelWithoutError() throws 
Exception {
+        testErrorHandlingOnClose(WRAPPED_TRANSPORT_EXCEPTION, null, 
RuntimeException.class);
+    }
+
+    @Test
+    void testCloseThrowsReleaseFailureFromChannelWithNonTransportError() 
throws Exception {
+        RuntimeException error = new RuntimeException("something else broke");
+        testErrorHandlingOnClose(error, error, RuntimeException.class);
+    }
+
+    private void testErrorHandlingOnClose(
+            RuntimeException error,
+            @Nullable RuntimeException channelError,
+            Class<? extends Exception> expectedException)
+            throws Exception {
+        int numInputChannels = 2;
+        LongSerializer inSerializer = LongSerializer.INSTANCE;
+        StreamTestSingleInputGate<Long> inputGate =
+                new StreamTestSingleInputGate<>(numInputChannels, 0, 
inSerializer, 1024);
+
+        Map<InputChannelInfo, TestRecordDeserializer> deserializers =
+                createDeserializers(inputGate.getInputGate());
+
+        // Replace the deserializer for channel 0 with one that throws on 
clear()
+        InputChannelInfo channel0 = new InputChannelInfo(0, 0);
+        deserializers.put(
+                channel0,
+                new 
ThrowingTestRecordDeserializer(ioManager.getSpillingDirectoriesPaths(), error));
+
+        StreamTaskInput<Long> input =
+                new TestStreamTaskNetworkInput(
+                        inputGate, inSerializer, numInputChannels, 
deserializers);
+
+        if (channelError != null) {
+            inputGate.setChannelError(0, error);
+        }
+        if (expectedException == null) {
+            input.close();
+        } else {
+            
assertThatThrownBy(input::close).isInstanceOf(RuntimeException.class);
+        }
+    }
+
     private static class TestStreamTaskNetworkInput
             extends AbstractStreamTaskNetworkInput<Long, 
TestRecordDeserializer> {
         public TestStreamTaskNetworkInput(
@@ -496,4 +556,19 @@ class StreamTaskNetworkInputTest {
             throw new UnsupportedOperationException();
         }
     }
+
+    private static class ThrowingTestRecordDeserializer extends 
TestRecordDeserializer {
+
+        private final RuntimeException error;
+
+        public ThrowingTestRecordDeserializer(String[] tmpDirectories, 
RuntimeException error) {
+            super(tmpDirectories);
+            this.error = error;
+        }
+
+        @Override
+        public void clear() {
+            throw error;
+        }
+    }
 }

Reply via email to