This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f957e3f [FLINK-25441][network] Wrap failure cuase with
ProducerFailedException only for PipelinedSubpartitionView.
f957e3f is described below
commit f957e3fee50e734dd6b2cbf0cbbef00fe810cd32
Author: Lijie Wang <[email protected]>
AuthorDate: Wed Jan 12 16:11:30 2022 +0800
[FLINK-25441][network] Wrap failure cuase with ProducerFailedException only
for PipelinedSubpartitionView.
---
.../io/network/netty/PartitionRequestQueue.java | 5 +--
.../partition/PipelinedSubpartitionView.java | 6 +++-
.../network/partition/ResultSubpartitionView.java | 8 +++++
.../network/netty/PartitionRequestQueueTest.java | 42 ----------------------
.../partition/PipelinedSubpartitionTest.java | 26 ++++++++++++++
5 files changed, 40 insertions(+), 47 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 397902f..dd9ea5f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
-import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -282,9 +281,7 @@ class PartitionRequestQueue extends
ChannelInboundHandlerAdapter {
Throwable cause = reader.getFailureCause();
if (cause != null) {
- ErrorResponse msg =
- new ErrorResponse(
- new ProducerFailedException(cause),
reader.getReceiverId());
+ ErrorResponse msg = new ErrorResponse(cause,
reader.getReceiverId());
ctx.writeAndFlush(msg);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index fa3efd6..757c420 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -91,7 +91,11 @@ public class PipelinedSubpartitionView implements
ResultSubpartitionView {
@Override
public Throwable getFailureCause() {
- return parent.getFailureCause();
+ Throwable cause = parent.getFailureCause();
+ if (cause != null) {
+ return new ProducerFailedException(cause);
+ }
+ return null;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 4ee89bb..b69fac0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -55,6 +55,14 @@ public interface ResultSubpartitionView {
void acknowledgeAllDataProcessed();
+ /**
+ * {@link ResultSubpartitionView} can decide whether the failure cause
should be reported to
+ * consumer as failure (primary failure) or {@link
ProducerFailedException} (secondary failure).
+ * Secondary failure can be reported only if producer (upstream task) is
guaranteed to failover.
+ *
+ * <p><strong>BEWARE:</strong> Incorrectly reporting failure cause as
primary failure, can hide
+ * the root cause of the failure from the user.
+ */
Throwable getFailureCause();
AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index efc66b0..34b9497 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.io.network.netty;
-import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
@@ -139,34 +138,6 @@ public class PartitionRequestQueueTest {
assertEquals(buffersToWrite, channel.outboundMessages().size());
}
- @Test
- public void testProducerFailedException() throws Exception {
- PartitionRequestQueue queue = new PartitionRequestQueue();
-
- ResultSubpartitionView view = new ReleasedResultSubpartitionView();
-
- ResultPartitionProvider partitionProvider =
- (partitionId, index, availabilityListener) -> view;
-
- EmbeddedChannel ch = new EmbeddedChannel(queue);
-
- CreditBasedSequenceNumberingViewReader seqView =
- new CreditBasedSequenceNumberingViewReader(new
InputChannelID(), 2, queue);
- seqView.requestSubpartitionView(partitionProvider, new
ResultPartitionID(), 0);
- // Add available buffer to trigger enqueue the erroneous view
- seqView.notifyDataAvailable();
-
- ch.runPendingTasks();
-
- // Read the enqueued msg
- Object msg = ch.readOutbound();
-
- assertEquals(msg.getClass(), NettyMessage.ErrorResponse.class);
-
- NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
- assertTrue(err.cause instanceof CancelTaskException);
- }
-
/** Tests {@link PartitionRequestQueue} buffer writing with default
buffers. */
@Test
public void testDefaultBufferWriting() throws Exception {
@@ -265,19 +236,6 @@ public class PartitionRequestQueueTest {
}
}
- private static class ReleasedResultSubpartitionView
- extends EmptyAlwaysAvailableResultSubpartitionView {
- @Override
- public boolean isReleased() {
- return true;
- }
-
- @Override
- public Throwable getFailureCause() {
- return new RuntimeException("Expected test exception");
- }
- }
-
/**
* Tests {@link
PartitionRequestQueue#enqueueAvailableReader(NetworkSequenceViewReader)},
* verifying the reader would be enqueued in the pipeline if the next
sending buffer is an event
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 1fb8edf..d5254c2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -336,6 +337,18 @@ public class PipelinedSubpartitionTest extends
SubpartitionTestBase {
assertEquals(-1,
subpartition.add(createFilledFinishedBufferConsumer(4)));
}
+ @Test
+ public void testProducerFailedException() {
+ PipelinedSubpartition subpartition =
+ new FailurePipelinedSubpartition(0, 2,
PartitionTestUtils.createPartition());
+
+ ResultSubpartitionView view =
+ subpartition.createReadView(new
NoOpBufferAvailablityListener());
+
+ assertNotNull(view.getFailureCause());
+ assertTrue(view.getFailureCause() instanceof CancelTaskException);
+ }
+
private void verifyViewReleasedAfterParentRelease(ResultSubpartition
partition)
throws Exception {
// Add a bufferConsumer
@@ -369,4 +382,17 @@ public class PipelinedSubpartitionTest extends
SubpartitionTestBase {
public static PipelinedSubpartition
createPipelinedSubpartition(ResultPartition parent) {
return new PipelinedSubpartition(0, 2, parent);
}
+
+ private static class FailurePipelinedSubpartition extends
PipelinedSubpartition {
+
+ FailurePipelinedSubpartition(
+ int index, int receiverExclusiveBuffersPerChannel,
ResultPartition parent) {
+ super(index, receiverExclusiveBuffersPerChannel, parent);
+ }
+
+ @Override
+ Throwable getFailureCause() {
+ return new RuntimeException("Expected test exception");
+ }
+ }
}