This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ce71bf  [FLINK-21788][network] Throw PartitionNotFoundException if 
the partition file has been lost for blocking shuffle
1ce71bf is described below

commit 1ce71bf4c1b48af5254a48e847692048ff94ee57
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Jan 26 11:06:46 2022 +0800

    [FLINK-21788][network] Throw PartitionNotFoundException if the partition 
file has been lost for blocking shuffle
    
    Currently, if the partition file has been lost for blocking shuffle, 
FileNotFoundException will be thrown and the partition data will not be 
regenerated. This change makes it throw PartitionNotFoundException instead.
    
    This closes #18515.
---
 .../partition/BoundedBlockingSubpartition.java     |  5 ++
 .../io/network/partition/PartitionedFile.java      |  5 ++
 .../partition/SortMergeResultPartition.java        |  4 +
 .../flink/test/runtime/BlockingShuffleITCase.java  | 95 +++++++++++++++++++---
 4 files changed, 96 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index c12adbd..0a5b0fc 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -213,6 +214,10 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
             checkState(!isReleased, "data partition already released");
             checkState(isFinished, "writing of blocking partition not yet 
finished");
 
+            if (!Files.isReadable(data.getFilePath())) {
+                throw new PartitionNotFoundException(parent.getPartitionId());
+            }
+
             final ResultSubpartitionView reader;
             if (useDirectFileTransfer) {
                 reader =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index dd34fe0..9f6018c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
 import java.nio.file.Path;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -109,6 +110,10 @@ public class PartitionedFile {
         return numRegions;
     }
 
+    public boolean isReadable() {
+        return Files.isReadable(dataFilePath) && 
Files.isReadable(indexFilePath);
+    }
+
     /**
      * Returns the index entry offset of the target region and subpartition in 
the index file. Both
      * region index and subpartition index start from 0.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index b94191b..695800e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -445,6 +445,10 @@ public class SortMergeResultPartition extends 
ResultPartition {
             checkState(!isReleased(), "Partition released.");
             checkState(isFinished(), "Trying to read unfinished blocking 
partition.");
 
+            if (!resultFile.isReadable()) {
+                throw new PartitionNotFoundException(getPartitionId());
+            }
+
             return readScheduler.createSubpartitionReader(
                     availabilityListener, subpartitionIndex, resultFile);
         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index ce788a8..ac99299 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -19,19 +19,27 @@
 package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 
@@ -44,10 +52,12 @@ public class BlockingShuffleITCase {
 
     private final int numSlotsPerTaskManager = 4;
 
+    @ClassRule public static final TemporaryFolder TEMP_FOLDER = new 
TemporaryFolder();
+
     @Test
     public void testBoundedBlockingShuffle() throws Exception {
-        JobGraph jobGraph = createJobGraph(1000000);
-        Configuration configuration = new Configuration();
+        JobGraph jobGraph = createJobGraph(1000000, false);
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
@@ -58,8 +68,8 @@ public class BlockingShuffleITCase {
 
     @Test
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
-        JobGraph jobGraph = createJobGraph(0);
-        Configuration configuration = new Configuration();
+        JobGraph jobGraph = createJobGraph(0, false);
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
                 Integer.MAX_VALUE);
@@ -70,35 +80,63 @@ public class BlockingShuffleITCase {
 
     @Test
     public void testSortMergeBlockingShuffle() throws Exception {
-        Configuration configuration = new Configuration();
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(1000000);
+        JobGraph jobGraph = createJobGraph(1000000, false);
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
 
     @Test
     public void testSortMergeBlockingShuffleWithoutData() throws Exception {
-        Configuration configuration = new Configuration();
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
-        JobGraph jobGraph = createJobGraph(0);
+        JobGraph jobGraph = createJobGraph(0, false);
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
+    }
+
+    @Test
+    public void testDeletePartitionFileOfBoundedBlockingShuffle() throws 
Exception {
+        Configuration configuration = getConfiguration();
+        configuration.setInteger(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                Integer.MAX_VALUE);
+
+        JobGraph jobGraph = createJobGraph(0, true);
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
+    }
+
+    @Test
+    public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws 
Exception {
+        Configuration configuration = getConfiguration();
+        JobGraph jobGraph = createJobGraph(0, true);
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
 
-    private JobGraph createJobGraph(int numRecordsToSend) {
+    private Configuration getConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(CoreOptions.TMP_DIRS, 
TEMP_FOLDER.getRoot().getAbsolutePath());
+        
configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 
100);
+        return configuration;
+    }
+
+    private JobGraph createJobGraph(int numRecordsToSend, boolean 
deletePartitionFile) {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
         env.setBufferTimeout(-1);
         env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
         DataStream<String> source = env.addSource(new 
StringSource(numRecordsToSend));
         source.rebalance()
                 .map((MapFunction<String, String>) value -> value)
                 .broadcast()
-                .addSink(new VerifySink());
+                .addSink(new VerifySink(deletePartitionFile));
 
         StreamGraph streamGraph = env.getStreamGraph();
         
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
@@ -130,11 +168,42 @@ public class BlockingShuffleITCase {
         }
     }
 
-    private static class VerifySink implements SinkFunction<String> {
+    private static class VerifySink extends RichSinkFunction<String> {
+        private final boolean deletePartitionFile;
+
+        VerifySink(boolean deletePartitionFile) {
+            this.deletePartitionFile = deletePartitionFile;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            if (!deletePartitionFile || getRuntimeContext().getAttemptNumber() 
> 0) {
+                return;
+            }
+
+            synchronized (BlockingShuffleITCase.class) {
+                deleteFiles(TEMP_FOLDER.getRoot());
+            }
+        }
 
         @Override
-        public void invoke(String value) throws Exception {
+        public void invoke(String value, Context context) throws Exception {
             assertEquals(RECORD, value);
         }
+
+        private static void deleteFiles(File root) throws IOException {
+            File[] files = root.listFiles();
+            if (files == null || files.length == 0) {
+                return;
+            }
+
+            for (File file : files) {
+                if (!file.isDirectory()) {
+                    Files.deleteIfExists(file.toPath());
+                } else {
+                    deleteFiles(file);
+                }
+            }
+        }
     }
 }

Reply via email to