azagrebin commented on a change in pull request #8646: [FLINK-12735][network]
Make shuffle environment implementation independent with IOManager
URL: https://github.com/apache/flink/pull/8646#discussion_r294223053
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
##########
@@ -272,17 +231,91 @@ public abstract BulkBlockChannelReader
createBulkBlockChannelReader(
* @return The directories that the I/O manager spills to, as path
strings.
*/
public String[] getSpillingDirectoriesPaths() {
- String[] strings = new String[this.paths.length];
+ File[] paths = fileChannelManager.getPaths();
+ String[] strings = new String[paths.length];
for (int i = 0; i < strings.length; i++) {
strings[i] = paths[i].getAbsolutePath();
}
return strings;
}
- private int getNextPathNum() {
- final int next = this.nextPath;
- final int newNext = next + 1;
- this.nextPath = newNext >= this.paths.length ? 0 : newNext;
- return next;
+ /**
+ * The manager used for creating/deleting file channels based on config
temp dirs.
+ */
+ private static class FileChannelManager {
+ /** The temporary directories for files. */
+ private final File[] paths;
+
+ /** A random number generator for the anonymous ChannelIDs. */
+ private final Random random = new Random();
+
+ /** The number of the next path to use. */
+ private volatile int nextPath = 0;
+
+ private FileChannelManager(String[] tempDirs) {
+ if (tempDirs == null || tempDirs.length == 0) {
+ throw new IllegalArgumentException("The
temporary directories must not be null or empty.");
+ }
+
+ this.paths = new File[tempDirs.length];
+ for (int i = 0; i < tempDirs.length; i++) {
+ File baseDir = new File(tempDirs[i]);
+ String subfolder = String.format("flink-io-%s",
UUID.randomUUID().toString());
+ File storageDir = new File(baseDir, subfolder);
+
+ if (!storageDir.exists() &&
!storageDir.mkdirs()) {
+ throw new RuntimeException(
+ "Could not create storage
directory for FileChannelManager: " + storageDir.getAbsolutePath());
+ }
+ paths[i] = storageDir;
+
+ LOG.info("FileChannelManager uses directory {}
for spill files.", storageDir.getAbsolutePath());
+ }
+ }
+
+ private FileIOChannel.ID createChannel() {
+ int num = getNextPathNum();
+ return new FileIOChannel.ID(paths[num], num, random);
+ }
+
+ private FileIOChannel.Enumerator createChannelEnumerator() {
+ return new FileIOChannel.Enumerator(paths, random);
+ }
+
+ private int getNextPathNum() {
+ int next = nextPath;
+ int newNext = next + 1;
+ nextPath = newNext >= paths.length ? 0 : newNext;
+ return next;
+ }
+
+ private File[] getPaths() {
+ return paths;
+ }
+
+ private void shutdown() {
+ // remove all the temp directories
+ for (File path : paths) {
+ try {
+ if (path != null) {
+ if (path.exists()) {
+
FileUtils.deleteDirectory(path);
+
LOG.info("FileChannelManager removed spill file directory {}",
path.getAbsolutePath());
+ }
+ }
+ } catch (Throwable t) {
Review comment:
too broad exception catch, only `IOException` is expected, others are
unexpected and should result in the fast global failure.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services