This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 6a4e8fb [FLINK-21788][network] Throw PartitionNotFoundException if
the partition file has been lost for blocking shuffle
6a4e8fb is described below
commit 6a4e8fbc14b60ab6bf7187afcef389db01999ef6
Author: Yuxin Tan <[email protected]>
AuthorDate: Wed Jan 26 11:06:46 2022 +0800
[FLINK-21788][network] Throw PartitionNotFoundException if the partition
file has been lost for blocking shuffle
Currently, if the partition file has been lost for blocking shuffle,
FileNotFoundException will be thrown and the partition data will not be
regenerated. This change makes it throw PartitionNotFoundException instead.
This closes #18515.
---
.../partition/BoundedBlockingSubpartition.java | 5 ++
.../io/network/partition/PartitionedFile.java | 5 ++
.../partition/SortMergeResultPartition.java | 4 +
.../flink/test/runtime/BlockingShuffleITCase.java | 94 +++++++++++++++++++---
4 files changed, 95 insertions(+), 13 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index b8bbd3c..42dc2ec 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -30,6 +30,7 @@ import javax.annotation.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;
@@ -213,6 +214,10 @@ final class BoundedBlockingSubpartition extends
ResultSubpartition {
checkState(!isReleased, "data partition already released");
checkState(isFinished, "writing of blocking partition not yet
finished");
+ if (!Files.isReadable(data.getFilePath())) {
+ throw new PartitionNotFoundException(parent.getPartitionId());
+ }
+
final ResultSubpartitionView reader;
if (useDirectFileTransfer) {
reader =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
index dd34fe0..9f6018c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFile.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.Files;
import java.nio.file.Path;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -109,6 +110,10 @@ public class PartitionedFile {
return numRegions;
}
+ public boolean isReadable() {
+ return Files.isReadable(dataFilePath) &&
Files.isReadable(indexFilePath);
+ }
+
/**
* Returns the index entry offset of the target region and subpartition in
the index file. Both
* region index and subpartition index start from 0.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index bea424a..a514012 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -444,6 +444,10 @@ public class SortMergeResultPartition extends
ResultPartition {
checkState(!isReleased(), "Partition released.");
checkState(isFinished(), "Trying to read unfinished blocking
partition.");
+ if (!resultFile.isReadable()) {
+ throw new PartitionNotFoundException(getPartitionId());
+ }
+
return readScheduler.createSubpartitionReader(
availabilityListener, subpartitionIndex, resultFile);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index 4c50bc7..4cd2c33 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -19,19 +19,27 @@
package org.apache.flink.test.runtime;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
@@ -44,53 +52,82 @@ public class BlockingShuffleITCase {
private final int numSlotsPerTaskManager = 4;
+ @ClassRule public static final TemporaryFolder TEMP_FOLDER = new
TemporaryFolder();
+
@Test
public void testBoundedBlockingShuffle() throws Exception {
- JobGraph jobGraph = createJobGraph(1000000);
- Configuration configuration = new Configuration();
+ JobGraph jobGraph = createJobGraph(1000000, false);
+ Configuration configuration = getConfiguration();
JobGraphRunningUtil.execute(
jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
}
@Test
public void testBoundedBlockingShuffleWithoutData() throws Exception {
- JobGraph jobGraph = createJobGraph(0);
- Configuration configuration = new Configuration();
+ JobGraph jobGraph = createJobGraph(0, false);
+ Configuration configuration = getConfiguration();
JobGraphRunningUtil.execute(
jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
}
@Test
public void testSortMergeBlockingShuffle() throws Exception {
- Configuration configuration = new Configuration();
+ Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
- JobGraph jobGraph = createJobGraph(1000000);
+ JobGraph jobGraph = createJobGraph(1000000, false);
JobGraphRunningUtil.execute(
jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
}
@Test
public void testSortMergeBlockingShuffleWithoutData() throws Exception {
- Configuration configuration = new Configuration();
+ Configuration configuration = getConfiguration();
configuration.setInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
- JobGraph jobGraph = createJobGraph(0);
+ JobGraph jobGraph = createJobGraph(0, false);
+ JobGraphRunningUtil.execute(
+ jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
+ }
+
+ @Test
+ public void testDeletePartitionFileOfBoundedBlockingShuffle() throws
Exception {
+ Configuration configuration = getConfiguration();
+ JobGraph jobGraph = createJobGraph(0, true);
JobGraphRunningUtil.execute(
jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
}
- private JobGraph createJobGraph(int numRecordsToSend) {
+ @Test
+ public void testDeletePartitionFileOfSortMergeBlockingShuffle() throws
Exception {
+ Configuration configuration = getConfiguration();
+ configuration.setInteger(
+
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+
+ JobGraph jobGraph = createJobGraph(0, true);
+ JobGraphRunningUtil.execute(
+ jobGraph, configuration, numTaskManagers,
numSlotsPerTaskManager);
+ }
+
+ private Configuration getConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set(CoreOptions.TMP_DIRS,
TEMP_FOLDER.getRoot().getAbsolutePath());
+
configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX,
100);
+ return configuration;
+ }
+
+ private JobGraph createJobGraph(int numRecordsToSend, boolean
deletePartitionFile) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
env.setBufferTimeout(-1);
env.setParallelism(numTaskManagers * numSlotsPerTaskManager);
DataStream<String> source = env.addSource(new
StringSource(numRecordsToSend));
source.rebalance()
.map((MapFunction<String, String>) value -> value)
.broadcast()
- .addSink(new VerifySink());
+ .addSink(new VerifySink(deletePartitionFile));
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
@@ -122,11 +159,42 @@ public class BlockingShuffleITCase {
}
}
- private static class VerifySink implements SinkFunction<String> {
+ private static class VerifySink extends RichSinkFunction<String> {
+ private final boolean deletePartitionFile;
+
+ VerifySink(boolean deletePartitionFile) {
+ this.deletePartitionFile = deletePartitionFile;
+ }
@Override
- public void invoke(String value) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ if (!deletePartitionFile || getRuntimeContext().getAttemptNumber()
> 0) {
+ return;
+ }
+
+ synchronized (BlockingShuffleITCase.class) {
+ deleteFiles(TEMP_FOLDER.getRoot());
+ }
+ }
+
+ @Override
+ public void invoke(String value, Context context) throws Exception {
assertEquals(RECORD, value);
}
+
+ private static void deleteFiles(File root) throws IOException {
+ File[] files = root.listFiles();
+ if (files == null || files.length == 0) {
+ return;
+ }
+
+ for (File file : files) {
+ if (!file.isDirectory()) {
+ Files.deleteIfExists(file.toPath());
+ } else {
+ deleteFiles(file);
+ }
+ }
+ }
}
}