This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ce71bf [FLINK-21788][network] Throw PartitionNotFoundException if
the partition file has been lost for blocking shuffle
1ce71bf is described below
commit 1ce71bf4c1b48af5254a48e847692048ff94ee57
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Jan 26 11:06:46 2022 +0800
[FLINK-21788][network] Throw PartitionNotFoundException if the partition
file has been lost for blocking shuffle
Currently, if the partition file has been lost for blocking shuffle,
FileNotFoundException will be thrown and the partition data will not be
regenerated. This change makes it throw PartitionNotFoundException instead.
This closes #18515.
---
.../partition/BoundedBlockingSubpartition.java | 5 ++
.../io/network/partition/PartitionedFile.java | 5 ++
.../partition/SortMergeResultPartition.java | 4 +
.../flink/test/runtime/BlockingShuffleITCase.java | 95 +++++++++++++++++++---
4 files changed, 96 insertions(+), 13 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index c12adbd..0a5b0fc 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;
@@ -213,6 +214,10 @@ final class BoundedBlockingSubpartition extends
ResultSubpartition {
checkState(!isReleased, "data partition already released");
checkState(isFinished, "writing of blocking partition not yet
finished");
+ if (!Files.isReadable(data.getFilePath())) {
+ throw new PartitionNotFoundException(parent.getPartitionId());
+ }
+
final ResultSubpartitionView reader;
if (useDirectFileTransfer) {
reader =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index dd34fe0..9f6018c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.Files;
import java.nio.file.Path;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -109,6 +110,10 @@ public class PartitionedFile {
return numRegions;
}
+ public boolean isReadable() {
+ return Files.isReadable(dataFilePath) &&
Files.isReadable(indexFilePath);
+ }
+
/**
* Returns the index entry offset of the target region and subpartition in
the index file. Both
* region index and subpartition index start from 0.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index b94191b..695800e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -445,6 +445,10 @@ public class SortMergeResultPartition extends
ResultPartition {
checkState(!isReleased(), "Partition released.");
checkState(isFinished(), "Trying to read unfinished blocking
partition.");
+ if (!resultFile.isReadable()) {
+ throw new PartitionNotFoundException(getPartitionId());
+ }
+
return readScheduler.createSubpartitionReader(
availabilityListener, subpartitionIndex, resultFile);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index ce788a8..ac99299 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -19,19 +19,27 @@
package org.apache.flink.test.runtime;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
@@ -44,10 +52,12 @@ public class BlockingShuffleITCase {
private final int numSlotsPerTaskManager = 4;
+ @ClassRule public static final TemporaryFolder TEMP_FOLDER = new
TemporaryFolder();
+
@Test
public void testBoundedBlockingShuffle() throws Exception {
- JobGraph jobGraph = createJobGraph(1000000);
- Configuration configuration = new Configuration();
+ JobGraph jobGraph = createJobGraph(1000000, false);
+ Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
@@ -58,8 +68,8 @@ public class BlockingShuffleITCase {
@Test
public void testBoundedBlockingShuffleWithoutData() throws Exception {
- JobGraph jobGraph = createJobGraph(0);
- Configuration configuration = new Configuration();
+ JobGraph jobGraph = createJobGraph(0, false);
+ Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
Integer.MAX_VALUE);
@@ -70,35 +80,63 @@ public class BlockingShuffleITCase {
@Test
public void testSortMergeBlockingShuffle() throws Exception {
- Configuration configuration = new Configuration();
+ Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
- JobGraph jobGraph = createJobGraph(1000000);
+ JobGraph jobGraph = createJobGraph(1000000, false);
JobGraphRunningUtil.execute(
jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
}
@Test
public void testSortMergeBlockingShuffleWithoutData() throws Exception {
- Configuration configuration = new Configuration();
+ Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
- JobGraph jobGraph = createJobGraph(0);
+ JobGraph jobGraph = createJobGraph(0, false);
+ JobGraphRunningUtil.execute(
+ jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
+ }
+
+ @Test
+ public void testDeletePartitionFileOfBoundedBlockingShuffle() throws
Exception {
+ Configuration configuration = getConfiguration();
+ configuration.setInteger(
+
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+ Integer.MAX_VALUE);
+
+ JobGraph jobGraph = createJobGraph(0, true);
+ JobGraphRunningUtil.execute(
+ jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
+ }
+
+ @Test
+ public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws
Exception {
+ Configuration configuration = getConfiguration();
+ JobGraph jobGraph = createJobGraph(0, true);
JobGraphRunningUtil.execute(
jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
}
- private JobGraph createJobGraph(int numRecordsToSend) {
+ private Configuration getConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(CoreOptions.TMP_DIRS,
TEMP_FOLDER.getRoot().getAbsolutePath());
+
configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX,
100);
+ return configuration;
+ }
+
+ private JobGraph createJobGraph(int numRecordsToSend, boolean
deletePartitionFile) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
env.setBufferTimeout(-1);
env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
DataStream<String> source = env.addSource(new
StringSource(numRecordsToSend));
source.rebalance()
.map((MapFunction<String, String>) value -> value)
.broadcast()
- .addSink(new VerifySink());
+ .addSink(new VerifySink(deletePartitionFile));
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
@@ -130,11 +168,42 @@ public class BlockingShuffleITCase {
}
}
- private static class VerifySink implements SinkFunction<String> {
+ private static class VerifySink extends RichSinkFunction<String> {
+ private final boolean deletePartitionFile;
+
+ VerifySink(boolean deletePartitionFile) {
+ this.deletePartitionFile = deletePartitionFile;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (!deletePartitionFile || getRuntimeContext().getAttemptNumber()
> 0) {
+ return;
+ }
+
+ synchronized (BlockingShuffleITCase.class) {
+ deleteFiles(TEMP_FOLDER.getRoot());
+ }
+ }
@Override
- public void invoke(String value) throws Exception {
+ public void invoke(String value, Context context) throws Exception {
assertEquals(RECORD, value);
}
+
+ private static void deleteFiles(File root) throws IOException {
+ File[] files = root.listFiles();
+ if (files == null || files.length == 0) {
+ return;
+ }
+
+ for (File file : files) {
+ if (!file.isDirectory()) {
+ Files.deleteIfExists(file.toPath());
+ } else {
+ deleteFiles(file);
+ }
+ }
+ }
}
}