This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new ca8dbcd  [FLINK-21788][network] Throw PartitionNotFoundException if 
the partition file has been lost for blocking shuffle
ca8dbcd is described below

commit ca8dbcdcadd5262645761c340a89c86bfce7446f
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Jan 26 11:06:46 2022 +0800

    [FLINK-21788][network] Throw PartitionNotFoundException if the partition 
file has been lost for blocking shuffle
    
    Currently, if the partition file has been lost for blocking shuffle, 
FileNotFoundException will be thrown and the partition data will not be 
regenerated. This change makes it throw PartitionNotFoundException instead.
    
    This closes #18515.
---
 .../partition/BoundedBlockingSubpartition.java     |  5 ++
 .../io/network/partition/PartitionedFile.java      |  5 ++
 .../partition/SortMergeResultPartition.java        |  4 +
 .../flink/test/runtime/BlockingShuffleITCase.java  | 94 +++++++++++++++++++---
 4 files changed, 95 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 4656c75..4a22f1d 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -213,6 +214,10 @@ final class BoundedBlockingSubpartition extends 
ResultSubpartition {
             checkState(!isReleased, "data partition already released");
             checkState(isFinished, "writing of blocking partition not yet 
finished");
 
+            if (!Files.isReadable(data.getFilePath())) {
+                throw new PartitionNotFoundException(parent.getPartitionId());
+            }
+
             final ResultSubpartitionView reader;
             if (useDirectFileTransfer) {
                 reader =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index dd34fe0..9f6018c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
 import java.nio.file.Path;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -109,6 +110,10 @@ public class PartitionedFile {
         return numRegions;
     }
 
+    public boolean isReadable() {
+        return Files.isReadable(dataFilePath) && 
Files.isReadable(indexFilePath);
+    }
+
     /**
      * Returns the index entry offset of the target region and subpartition in 
the index file. Both
      * region index and subpartition index start from 0.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 9382503..6335ddb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -442,6 +442,10 @@ public class SortMergeResultPartition extends 
ResultPartition {
             checkState(!isReleased(), "Partition released.");
             checkState(isFinished(), "Trying to read unfinished blocking 
partition.");
 
+            if (!resultFile.isReadable()) {
+                throw new PartitionNotFoundException(getPartitionId());
+            }
+
             return readScheduler.crateSubpartitionReader(
                     availabilityListener, subpartitionIndex, resultFile);
         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index c1df574..9eddf7a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -19,19 +19,27 @@
 package org.apache.flink.test.runtime;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 
@@ -44,53 +52,82 @@ public class BlockingShuffleITCase {
 
     private final int numSlotsPerTaskManager = 4;
 
+    @ClassRule public static final TemporaryFolder TEMP_FOLDER = new 
TemporaryFolder();
+
     @Test
     public void testBoundedBlockingShuffle() throws Exception {
-        JobGraph jobGraph = createJobGraph(1000000);
-        Configuration configuration = new Configuration();
+        JobGraph jobGraph = createJobGraph(1000000, false);
+        Configuration configuration = getConfiguration();
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
 
     @Test
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
-        JobGraph jobGraph = createJobGraph(0);
-        Configuration configuration = new Configuration();
+        JobGraph jobGraph = createJobGraph(0, false);
+        Configuration configuration = getConfiguration();
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
 
     @Test
     public void testSortMergeBlockingShuffle() throws Exception {
-        Configuration configuration = new Configuration();
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
 
-        JobGraph jobGraph = createJobGraph(1000000);
+        JobGraph jobGraph = createJobGraph(1000000, false);
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
 
     @Test
     public void testSortMergeBlockingShuffleWithoutData() throws Exception {
-        Configuration configuration = new Configuration();
+        Configuration configuration = getConfiguration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
 
-        JobGraph jobGraph = createJobGraph(0);
+        JobGraph jobGraph = createJobGraph(0, false);
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
+    }
+
+    @Test
+    public void testDeletePartitionFileOfBoundedBlockingShuffle() throws 
Exception {
+        Configuration configuration = getConfiguration();
+        JobGraph jobGraph = createJobGraph(0, true);
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
 
-    private JobGraph createJobGraph(int numRecordsToSend) {
+    @Test
+    public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws 
Exception {
+        Configuration configuration = getConfiguration();
+        configuration.setInteger(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+
+        JobGraph jobGraph = createJobGraph(0, true);
+        JobGraphRunningUtil.execute(
+                jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
+    }
+
+    private Configuration getConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(CoreOptions.TMP_DIRS, 
TEMP_FOLDER.getRoot().getAbsolutePath());
+        
configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 
100);
+        return configuration;
+    }
+
+    private JobGraph createJobGraph(int numRecordsToSend, boolean 
deletePartitionFile) {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
         env.setBufferTimeout(-1);
         env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
         DataStream<String> source = env.addSource(new 
StringSource(numRecordsToSend));
         source.rebalance()
                 .map((MapFunction<String, String>) value -> value)
                 .broadcast()
-                .addSink(new VerifySink());
+                .addSink(new VerifySink(deletePartitionFile));
 
         StreamGraph streamGraph = env.getStreamGraph();
         
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
@@ -122,11 +159,42 @@ public class BlockingShuffleITCase {
         }
     }
 
-    private static class VerifySink implements SinkFunction<String> {
+    private static class VerifySink extends RichSinkFunction<String> {
+        private final boolean deletePartitionFile;
+
+        VerifySink(boolean deletePartitionFile) {
+            this.deletePartitionFile = deletePartitionFile;
+        }
 
         @Override
-        public void invoke(String value) throws Exception {
+        public void open(Configuration parameters) throws Exception {
+            if (!deletePartitionFile || getRuntimeContext().getAttemptNumber() 
> 0) {
+                return;
+            }
+
+            synchronized (BlockingShuffleITCase.class) {
+                deleteFiles(TEMP_FOLDER.getRoot());
+            }
+        }
+
+        @Override
+        public void invoke(String value, Context context) throws Exception {
             assertEquals(RECORD, value);
         }
+
+        private static void deleteFiles(File root) throws IOException {
+            File[] files = root.listFiles();
+            if (files == null || files.length == 0) {
+                return;
+            }
+
+            for (File file : files) {
+                if (!file.isDirectory()) {
+                    Files.deleteIfExists(file.toPath());
+                } else {
+                    deleteFiles(file);
+                }
+            }
+        }
     }
 }

Reply via email to