This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9acd2ff317b4a6181e85ba50ddbe177573351d7
Author: Zhijiang <[email protected]>
AuthorDate: Mon Jul 1 23:31:30 2019 +0800

    [FLINK-12735][network] Refactor IOManager to introduce FileChannelManager
    
    IOManager mainly has two roles. One is for managing file channels based on 
config temp dirs, and the other is for abstracting ways to read/writer files.
    We could define a FileChannelManager class for handing the file channels 
which could be reused for shuffle environment future. To do so the shuffle
    environment do not need to rely on the whole IOManager.
---
 .../flink/runtime/io/disk/FileChannelManager.java  |  45 ++++++++
 .../runtime/io/disk/FileChannelManagerImpl.java    | 126 +++++++++++++++++++++
 .../flink/runtime/io/disk/iomanager/IOManager.java | 119 ++++++-------------
 3 files changed, 203 insertions(+), 87 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
new file mode 100644
index 0000000..22079db
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManager.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+
+import java.io.File;
+
+/**
+ * The manager used for creating/getting file IO channels based on config temp 
dirs.
+ */
+public interface FileChannelManager extends AutoCloseable {
+
+       /**
+        * Creates an ID identifying an underlying file channel and returns it.
+        */
+       ID createChannel();
+
+       /**
+        * Creates an enumerator for channels that logically belong together 
and returns it.
+        */
+       Enumerator createChannelEnumerator();
+
+       /**
+        * Gets all the files corresponding to the config temp dirs.
+        */
+       File[] getPaths();
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
new file mode 100644
index 0000000..2bdb8d9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelManagerImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The manager used for creating/deleting file channels based on config temp 
dirs.
+ */
+public class FileChannelManagerImpl implements FileChannelManager {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileChannelManagerImpl.class);
+
+       /** The temporary directories for files. */
+       private final File[] paths;
+
+       /** A random number generator for the anonymous Channel IDs. */
+       private final Random random;
+
+       /** The number of the next path to use. */
+       private volatile int nextPath;
+
+       public FileChannelManagerImpl(String[] tempDirs, String prefix) {
+               checkNotNull(tempDirs, "The temporary directories must not be 
null.");
+               checkArgument(tempDirs.length > 0, "The temporary directories 
must not be empty.");
+
+               this.random = new Random();
+               this.nextPath = 0;
+               this.paths = createFiles(tempDirs, prefix);
+       }
+
+       private static File[] createFiles(String[] tempDirs, String prefix) {
+               File[] files = new File[tempDirs.length];
+               for (int i = 0; i < tempDirs.length; i++) {
+                       File baseDir = new File(tempDirs[i]);
+                       String subfolder = String.format("flink-%s-%s", prefix, 
UUID.randomUUID().toString());
+                       File storageDir = new File(baseDir, subfolder);
+
+                       if (!storageDir.exists() && !storageDir.mkdirs()) {
+                               throw new RuntimeException(
+                                       "Could not create storage directory for 
FileChannelManager: " + storageDir.getAbsolutePath());
+                       }
+                       files[i] = storageDir;
+
+                       LOG.info("FileChannelManager uses directory {} for 
spill files.", storageDir.getAbsolutePath());
+               }
+               return files;
+       }
+
+       @Override
+       public ID createChannel() {
+               int num = getNextPathNum();
+               return new ID(paths[num], num, random);
+       }
+
+       @Override
+       public Enumerator createChannelEnumerator() {
+               return new Enumerator(paths, random);
+       }
+
+       @Override
+       public File[] getPaths() {
+               return Arrays.copyOf(paths, paths.length);
+       }
+
+       /**
+        * Remove all the temp directories.
+        */
+       @Override
+       public void close() throws Exception {
+               IOUtils.closeAll(Arrays.stream(paths)
+                       .filter(File::exists)
+                       .map(FileChannelManagerImpl::getFileCloser)
+                       .collect(Collectors.toList()));
+       }
+
+       private static AutoCloseable getFileCloser(File path) {
+               return () -> {
+                       try {
+                               FileUtils.deleteDirectory(path);
+                               LOG.info("FileChannelManager removed spill file 
directory {}", path.getAbsolutePath());
+                       } catch (IOException e) {
+                               String errorMessage = 
String.format("FileChannelManager failed to properly clean up temp file 
directory: %s", path);
+                               throw new IOException(errorMessage, e);
+                       }
+               };
+       }
+
+       private int getNextPathNum() {
+               int next = nextPath;
+               int newNext = next + 1;
+               nextPath = newNext >= paths.length ? 0 : newNext;
+               return next;
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index a649e42..1be8639 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,24 +19,20 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
-import java.util.Random;
-import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
 
 /**
  * The facade for the provided I/O manager services.
@@ -44,14 +40,9 @@ import java.util.stream.Collectors;
 public abstract class IOManager implements AutoCloseable {
        protected static final Logger LOG = 
LoggerFactory.getLogger(IOManager.class);
 
-       /** The temporary directories for files. */
-       private final File[] paths;
+       private static final String DIR_NAME_PREFIX = "io";
 
-       /** A random number generator for the anonymous ChannelIDs. */
-       private final Random random;
-
-       /** The number of the next path to use. */
-       private volatile int nextPath;
+       private final FileChannelManager fileChannelManager;
 
        // 
-------------------------------------------------------------------------
        //               Constructors / Destructors
@@ -63,26 +54,7 @@ public abstract class IOManager implements AutoCloseable {
         * @param tempDirs The basic directories for files underlying anonymous 
channels.
         */
        protected IOManager(String[] tempDirs) {
-               if (tempDirs == null || tempDirs.length == 0) {
-                       throw new IllegalArgumentException("The temporary 
directories must not be null or empty.");
-               }
-
-               this.random = new Random();
-               this.nextPath = 0;
-
-               this.paths = new File[tempDirs.length];
-               for (int i = 0; i < tempDirs.length; i++) {
-                       File baseDir = new File(tempDirs[i]);
-                       String subfolder = String.format("flink-io-%s", 
UUID.randomUUID().toString());
-                       File storageDir = new File(baseDir, subfolder);
-
-                       if (!storageDir.exists() && !storageDir.mkdirs()) {
-                               throw new RuntimeException(
-                                               "Could not create storage 
directory for IOManager: " + storageDir.getAbsolutePath());
-                       }
-                       paths[i] = storageDir;
-                       LOG.info("I/O manager uses directory {} for spill 
files.", storageDir.getAbsolutePath());
-               }
+               this.fileChannelManager = new 
FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
        }
 
        /**
@@ -90,22 +62,7 @@ public abstract class IOManager implements AutoCloseable {
         */
        @Override
        public void close() throws Exception {
-               IOUtils.closeAll(Arrays.stream(paths)
-                       .filter(File::exists)
-                       .map(IOManager::getFileCloser)
-                       .collect(Collectors.toList()));
-       }
-
-       private static AutoCloseable getFileCloser(File path) {
-               return () -> {
-                       try {
-                               FileUtils.deleteDirectory(path);
-                               LOG.info("I/O manager removed spill file 
directory {}", path.getAbsolutePath());
-                       } catch (IOException e) {
-                               String errorMessage = String.format("IOManager 
failed to properly clean up temp file directory: %s", path);
-                               throw new IOException(errorMessage, e);
-                       }
-               };
+               fileChannelManager.close();
        }
 
        // 
------------------------------------------------------------------------
@@ -119,8 +76,7 @@ public abstract class IOManager implements AutoCloseable {
         * @return A channel to a temporary directory.
         */
        public ID createChannel() {
-               final int num = getNextPathNum();
-               return new ID(this.paths[num], num, this.random);
+               return fileChannelManager.createChannel();
        }
 
        /**
@@ -130,7 +86,7 @@ public abstract class IOManager implements AutoCloseable {
         * @return An enumerator for channels.
         */
        public Enumerator createChannelEnumerator() {
-               return new Enumerator(this.paths, this.random);
+               return fileChannelManager.createChannelEnumerator();
        }
 
        /**
@@ -147,6 +103,29 @@ public abstract class IOManager implements AutoCloseable {
                }
        }
 
+       /**
+        * Gets the directories that the I/O manager spills to.
+        *
+        * @return The directories that the I/O manager spills to.
+        */
+       public File[] getSpillingDirectories() {
+               return fileChannelManager.getPaths();
+       }
+
+       /**
+        * Gets the directories that the I/O manager spills to, as path strings.
+        *
+        * @return The directories that the I/O manager spills to, as path 
strings.
+        */
+       public String[] getSpillingDirectoriesPaths() {
+               File[] paths = fileChannelManager.getPaths();
+               String[] strings = new String[paths.length];
+               for (int i = 0; i < strings.length; i++) {
+                       strings[i] = paths[i].getAbsolutePath();
+               }
+               return strings;
+       }
+
        // 
------------------------------------------------------------------------
        //                        Reader / Writer instantiations
        // 
------------------------------------------------------------------------
@@ -245,38 +224,4 @@ public abstract class IOManager implements AutoCloseable {
                ID channelID,
                List<MemorySegment> targetSegments,
                int numBlocks) throws IOException;
-
-
-       // 
------------------------------------------------------------------------
-       //                          Utilities
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the directories that the I/O manager spills to.
-        *
-        * @return The directories that the I/O manager spills to.
-        */
-       public File[] getSpillingDirectories() {
-               return this.paths;
-       }
-
-       /**
-        * Gets the directories that the I/O manager spills to, as path strings.
-        *
-        * @return The directories that the I/O manager spills to, as path 
strings.
-        */
-       public String[] getSpillingDirectoriesPaths() {
-               String[] strings = new String[this.paths.length];
-               for (int i = 0; i < strings.length; i++) {
-                       strings[i] = paths[i].getAbsolutePath();
-               }
-               return strings;
-       }
-
-       private int getNextPathNum() {
-               final int next = this.nextPath;
-               final int newNext = next + 1;
-               this.nextPath = newNext >= this.paths.length ? 0 : newNext;
-               return next;
-       }
 }

Reply via email to