This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f957e3f  [FLINK-25441][network] Wrap failure cuase with 
ProducerFailedException only for PipelinedSubpartitionView.
f957e3f is described below

commit f957e3fee50e734dd6b2cbf0cbbef00fe810cd32
Author: Lijie Wang <[email protected]>
AuthorDate: Wed Jan 12 16:11:30 2022 +0800

    [FLINK-25441][network] Wrap failure cuase with ProducerFailedException only 
for PipelinedSubpartitionView.
---
 .../io/network/netty/PartitionRequestQueue.java    |  5 +--
 .../partition/PipelinedSubpartitionView.java       |  6 +++-
 .../network/partition/ResultSubpartitionView.java  |  8 +++++
 .../network/netty/PartitionRequestQueueTest.java   | 42 ----------------------
 .../partition/PipelinedSubpartitionTest.java       | 26 ++++++++++++++
 5 files changed, 40 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 397902f..dd9ea5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
-import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -282,9 +281,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
 
                     Throwable cause = reader.getFailureCause();
                     if (cause != null) {
-                        ErrorResponse msg =
-                                new ErrorResponse(
-                                        new ProducerFailedException(cause), 
reader.getReceiverId());
+                        ErrorResponse msg = new ErrorResponse(cause, 
reader.getReceiverId());
 
                         ctx.writeAndFlush(msg);
                     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index fa3efd6..757c420 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -91,7 +91,11 @@ public class PipelinedSubpartitionView implements 
ResultSubpartitionView {
 
     @Override
     public Throwable getFailureCause() {
-        return parent.getFailureCause();
+        Throwable cause = parent.getFailureCause();
+        if (cause != null) {
+            return new ProducerFailedException(cause);
+        }
+        return null;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 4ee89bb..b69fac0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -55,6 +55,14 @@ public interface ResultSubpartitionView {
 
     void acknowledgeAllDataProcessed();
 
+    /**
+     * {@link ResultSubpartitionView} can decide whether the failure cause 
should be reported to
+     * consumer as failure (primary failure) or {@link 
ProducerFailedException} (secondary failure).
+     * Secondary failure can be reported only if producer (upstream task) is 
guaranteed to failover.
+     *
+     * <p><strong>BEWARE:</strong> Incorrectly reporting failure cause as 
primary failure, can hide
+     * the root cause of the failure from the user.
+     */
     Throwable getFailureCause();
 
     AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index efc66b0..34b9497 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
@@ -139,34 +138,6 @@ public class PartitionRequestQueueTest {
         assertEquals(buffersToWrite, channel.outboundMessages().size());
     }
 
-    @Test
-    public void testProducerFailedException() throws Exception {
-        PartitionRequestQueue queue = new PartitionRequestQueue();
-
-        ResultSubpartitionView view = new ReleasedResultSubpartitionView();
-
-        ResultPartitionProvider partitionProvider =
-                (partitionId, index, availabilityListener) -> view;
-
-        EmbeddedChannel ch = new EmbeddedChannel(queue);
-
-        CreditBasedSequenceNumberingViewReader seqView =
-                new CreditBasedSequenceNumberingViewReader(new 
InputChannelID(), 2, queue);
-        seqView.requestSubpartitionView(partitionProvider, new 
ResultPartitionID(), 0);
-        // Add available buffer to trigger enqueue the erroneous view
-        seqView.notifyDataAvailable();
-
-        ch.runPendingTasks();
-
-        // Read the enqueued msg
-        Object msg = ch.readOutbound();
-
-        assertEquals(msg.getClass(), NettyMessage.ErrorResponse.class);
-
-        NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
-        assertTrue(err.cause instanceof CancelTaskException);
-    }
-
     /** Tests {@link PartitionRequestQueue} buffer writing with default 
buffers. */
     @Test
     public void testDefaultBufferWriting() throws Exception {
@@ -265,19 +236,6 @@ public class PartitionRequestQueueTest {
         }
     }
 
-    private static class ReleasedResultSubpartitionView
-            extends EmptyAlwaysAvailableResultSubpartitionView {
-        @Override
-        public boolean isReleased() {
-            return true;
-        }
-
-        @Override
-        public Throwable getFailureCause() {
-            return new RuntimeException("Expected test exception");
-        }
-    }
-
     /**
      * Tests {@link 
PartitionRequestQueue#enqueueAvailableReader(NetworkSequenceViewReader)},
      * verifying the reader would be enqueued in the pipeline if the next 
sending buffer is an event
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 1fb8edf..d5254c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -336,6 +337,18 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
         assertEquals(-1, 
subpartition.add(createFilledFinishedBufferConsumer(4)));
     }
 
+    @Test
+    public void testProducerFailedException() {
+        PipelinedSubpartition subpartition =
+                new FailurePipelinedSubpartition(0, 2, 
PartitionTestUtils.createPartition());
+
+        ResultSubpartitionView view =
+                subpartition.createReadView(new 
NoOpBufferAvailablityListener());
+
+        assertNotNull(view.getFailureCause());
+        assertTrue(view.getFailureCause() instanceof CancelTaskException);
+    }
+
     private void verifyViewReleasedAfterParentRelease(ResultSubpartition 
partition)
             throws Exception {
         // Add a bufferConsumer
@@ -369,4 +382,17 @@ public class PipelinedSubpartitionTest extends 
SubpartitionTestBase {
     public static PipelinedSubpartition 
createPipelinedSubpartition(ResultPartition parent) {
         return new PipelinedSubpartition(0, 2, parent);
     }
+
+    private static class FailurePipelinedSubpartition extends 
PipelinedSubpartition {
+
+        FailurePipelinedSubpartition(
+                int index, int receiverExclusiveBuffersPerChannel, 
ResultPartition parent) {
+            super(index, receiverExclusiveBuffersPerChannel, parent);
+        }
+
+        @Override
+        Throwable getFailureCause() {
+            return new RuntimeException("Expected test exception");
+        }
+    }
 }

Reply via email to