This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f9acd2ff317b4a6181e85ba50ddbe177573351d7 Author: Zhijiang <[email protected]> AuthorDate: Mon Jul 1 23:31:30 2019 +0800 [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager IOManager mainly has two roles. One is for managing file channels based on config temp dirs, and the other is for abstracting ways to read/writer files. We could define a FileChannelManager class for handing the file channels which could be reused for shuffle environment future. To do so the shuffle environment do not need to rely on the whole IOManager. --- .../flink/runtime/io/disk/FileChannelManager.java | 45 ++++++++ .../runtime/io/disk/FileChannelManagerImpl.java | 126 +++++++++++++++++++++ .../flink/runtime/io/disk/iomanager/IOManager.java | 119 ++++++------------- 3 files changed, 203 insertions(+), 87 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java new file mode 100644 index 0000000..22079db --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; + +import java.io.File; + +/** + * The manager used for creating/getting file IO channels based on config temp dirs. + */ +public interface FileChannelManager extends AutoCloseable { + + /** + * Creates an ID identifying an underlying file channel and returns it. + */ + ID createChannel(); + + /** + * Creates an enumerator for channels that logically belong together and returns it. + */ + Enumerator createChannelEnumerator(); + + /** + * Gets all the files corresponding to the config temp dirs. + */ + File[] getPaths(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java new file mode 100644 index 0000000..2bdb8d9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The manager used for creating/deleting file channels based on config temp dirs. + */ +public class FileChannelManagerImpl implements FileChannelManager { + private static final Logger LOG = LoggerFactory.getLogger(FileChannelManagerImpl.class); + + /** The temporary directories for files. */ + private final File[] paths; + + /** A random number generator for the anonymous Channel IDs. */ + private final Random random; + + /** The number of the next path to use. */ + private volatile int nextPath; + + public FileChannelManagerImpl(String[] tempDirs, String prefix) { + checkNotNull(tempDirs, "The temporary directories must not be null."); + checkArgument(tempDirs.length > 0, "The temporary directories must not be empty."); + + this.random = new Random(); + this.nextPath = 0; + this.paths = createFiles(tempDirs, prefix); + } + + private static File[] createFiles(String[] tempDirs, String prefix) { + File[] files = new File[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format("flink-%s-%s", prefix, UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() && !storageDir.mkdirs()) { + throw new RuntimeException( + "Could not create storage directory for FileChannelManager: " + storageDir.getAbsolutePath()); + } + files[i] = storageDir; + + LOG.info("FileChannelManager uses directory {} for spill files.", storageDir.getAbsolutePath()); + } + return files; + } + + @Override + public ID createChannel() { + int num = getNextPathNum(); + return new ID(paths[num], num, random); + } + + @Override + public Enumerator createChannelEnumerator() { + return new Enumerator(paths, random); + } + + @Override + public File[] getPaths() { + return Arrays.copyOf(paths, paths.length); + } + + /** + * Remove all the temp directories. + */ + @Override + public void close() throws Exception { + IOUtils.closeAll(Arrays.stream(paths) + .filter(File::exists) + .map(FileChannelManagerImpl::getFileCloser) + .collect(Collectors.toList())); + } + + private static AutoCloseable getFileCloser(File path) { + return () -> { + try { + FileUtils.deleteDirectory(path); + LOG.info("FileChannelManager removed spill file directory {}", path.getAbsolutePath()); + } catch (IOException e) { + String errorMessage = String.format("FileChannelManager failed to properly clean up temp file directory: %s", path); + throw new IOException(errorMessage, e); + } + }; + } + + private int getNextPathNum() { + int next = nextPath; + int newNext = next + 1; + nextPath = newNext >= paths.length ? 0 : newNext; + return next; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index a649e42..1be8639 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -19,24 +19,20 @@ package org.apache.flink.runtime.io.disk.iomanager; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.FileChannelManager; +import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator; import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; -import org.apache.flink.util.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.List; -import java.util.Objects; -import java.util.Random; -import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; /** * The facade for the provided I/O manager services. @@ -44,14 +40,9 @@ import java.util.stream.Collectors; public abstract class IOManager implements AutoCloseable { protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); - /** The temporary directories for files. */ - private final File[] paths; + private static final String DIR_NAME_PREFIX = "io"; - /** A random number generator for the anonymous ChannelIDs. */ - private final Random random; - - /** The number of the next path to use. */ - private volatile int nextPath; + private final FileChannelManager fileChannelManager; // ------------------------------------------------------------------------- // Constructors / Destructors @@ -63,26 +54,7 @@ public abstract class IOManager implements AutoCloseable { * @param tempDirs The basic directories for files underlying anonymous channels. */ protected IOManager(String[] tempDirs) { - if (tempDirs == null || tempDirs.length == 0) { - throw new IllegalArgumentException("The temporary directories must not be null or empty."); - } - - this.random = new Random(); - this.nextPath = 0; - - this.paths = new File[tempDirs.length]; - for (int i = 0; i < tempDirs.length; i++) { - File baseDir = new File(tempDirs[i]); - String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString()); - File storageDir = new File(baseDir, subfolder); - - if (!storageDir.exists() && !storageDir.mkdirs()) { - throw new RuntimeException( - "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath()); - } - paths[i] = storageDir; - LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); - } + this.fileChannelManager = new FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX); } /** @@ -90,22 +62,7 @@ public abstract class IOManager implements AutoCloseable { */ @Override public void close() throws Exception { - IOUtils.closeAll(Arrays.stream(paths) - .filter(File::exists) - .map(IOManager::getFileCloser) - .collect(Collectors.toList())); - } - - private static AutoCloseable getFileCloser(File path) { - return () -> { - try { - FileUtils.deleteDirectory(path); - LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); - } catch (IOException e) { - String errorMessage = String.format("IOManager failed to properly clean up temp file directory: %s", path); - throw new IOException(errorMessage, e); - } - }; + fileChannelManager.close(); } // ------------------------------------------------------------------------ @@ -119,8 +76,7 @@ public abstract class IOManager implements AutoCloseable { * @return A channel to a temporary directory. */ public ID createChannel() { - final int num = getNextPathNum(); - return new ID(this.paths[num], num, this.random); + return fileChannelManager.createChannel(); } /** @@ -130,7 +86,7 @@ public abstract class IOManager implements AutoCloseable { * @return An enumerator for channels. */ public Enumerator createChannelEnumerator() { - return new Enumerator(this.paths, this.random); + return fileChannelManager.createChannelEnumerator(); } /** @@ -147,6 +103,29 @@ public abstract class IOManager implements AutoCloseable { } } + /** + * Gets the directories that the I/O manager spills to. + * + * @return The directories that the I/O manager spills to. + */ + public File[] getSpillingDirectories() { + return fileChannelManager.getPaths(); + } + + /** + * Gets the directories that the I/O manager spills to, as path strings. + * + * @return The directories that the I/O manager spills to, as path strings. + */ + public String[] getSpillingDirectoriesPaths() { + File[] paths = fileChannelManager.getPaths(); + String[] strings = new String[paths.length]; + for (int i = 0; i < strings.length; i++) { + strings[i] = paths[i].getAbsolutePath(); + } + return strings; + } + // ------------------------------------------------------------------------ // Reader / Writer instantiations // ------------------------------------------------------------------------ @@ -245,38 +224,4 @@ public abstract class IOManager implements AutoCloseable { ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException; - - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - /** - * Gets the directories that the I/O manager spills to. - * - * @return The directories that the I/O manager spills to. - */ - public File[] getSpillingDirectories() { - return this.paths; - } - - /** - * Gets the directories that the I/O manager spills to, as path strings. - * - * @return The directories that the I/O manager spills to, as path strings. - */ - public String[] getSpillingDirectoriesPaths() { - String[] strings = new String[this.paths.length]; - for (int i = 0; i < strings.length; i++) { - strings[i] = paths[i].getAbsolutePath(); - } - return strings; - } - - private int getNextPathNum() { - final int next = this.nextPath; - final int newNext = next + 1; - this.nextPath = newNext >= this.paths.length ? 0 : newNext; - return next; - } }
