This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdbfb82ef3eaa242abf6d070463c0895ac244ef1
Author: Zhijiang <[email protected]>
AuthorDate: Wed Jun 19 12:36:54 2019 +0800

    [hotfix][runtime] Cleanup IOManager code
---
 .../runtime/io/disk/iomanager/FileIOChannel.java   | 78 ++++++++++----------
 .../flink/runtime/io/disk/iomanager/IOManager.java | 83 +++++++++++-----------
 .../IOManagerAsyncWithNoOpBufferFileWriter.java    | 53 --------------
 .../operators/sort/LargeRecordHandlerITCase.java   | 26 ++++---
 4 files changed, 91 insertions(+), 149 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index fd8e8e6..ef57e03 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -18,93 +18,90 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.util.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.flink.util.StringUtils;
-
 /**
  * A Channel represents a collection of files that belong logically to the 
same resource. An example is a collection of
  * files that contain sorted runs of data from the same stream, that will 
later on be merged together.
  */
 public interface FileIOChannel {
-       
+
        /**
         * Gets the channel ID of this I/O channel.
-        * 
+        *
         * @return The channel ID.
         */
-       FileIOChannel.ID getChannelID();
-       
+       ID getChannelID();
+
        /**
         * Gets the size (in bytes) of the file underlying the channel.
-        * 
-        * @return The size (in bytes) of the file underlying the channel.
         */
        long getSize() throws IOException;
-       
+
        /**
         * Checks whether the channel has been closed.
-        * 
+        *
         * @return True if the channel has been closed, false otherwise.
         */
        boolean isClosed();
 
        /**
-       * Closes the channel. For asynchronous implementations, this method 
waits until all pending requests are
-       * handled. Even if an exception interrupts the closing, the underlying 
<tt>FileChannel</tt> is closed.
-       * 
-       * @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
-       */
+        * Closes the channel. For asynchronous implementations, this method 
waits until all pending requests are
+        * handled. Even if an exception interrupts the closing, the underlying 
<tt>FileChannel</tt> is closed.
+        *
+        * @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
+        */
        void close() throws IOException;
 
        /**
         * Deletes the file underlying this I/O channel.
-        *  
+        *
         * @throws IllegalStateException Thrown, when the channel is still open.
         */
        void deleteChannel();
-       
-       /**
-       * Closes the channel and deletes the underlying file.
-       * For asynchronous implementations, this method waits until all pending 
requests are handled;
-       * 
-       * @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
-       */
-       public void closeAndDelete() throws IOException;
 
        FileChannel getNioFileChannel();
-       
+
+       /**
+        * Closes the channel and deletes the underlying file. For asynchronous 
implementations,
+        * this method waits until all pending requests are handled.
+        *
+        * @throws IOException Thrown, if an error occurred while waiting for 
pending requests.
+        */
+       void closeAndDelete() throws IOException;
+
        // 
--------------------------------------------------------------------------------------------
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * An ID identifying an underlying file channel.
         */
-       public static class ID {
-               
+       class ID {
+
                private static final int RANDOM_BYTES_LENGTH = 16;
-               
+
                private final File path;
-               
+
                private final int threadNum;
 
-               protected ID(File path, int threadNum) {
+               private ID(File path, int threadNum) {
                        this.path = path;
                        this.threadNum = threadNum;
                }
 
-               protected ID(File basePath, int threadNum, Random random) {
+               public ID(File basePath, int threadNum, Random random) {
                        this.path = new File(basePath, randomString(random) + 
".channel");
                        this.threadNum = threadNum;
                }
 
                /**
                 * Returns the path to the underlying temporary file.
-                * @return The path to the underlying temporary file..
                 */
                public String getPath() {
                        return path.getAbsolutePath();
@@ -112,12 +109,11 @@ public interface FileIOChannel {
 
                /**
                 * Returns the path to the underlying temporary file as a File.
-                * @return The path to the underlying temporary file as a File.
                 */
                public File getPathFile() {
                        return path;
                }
-               
+
                int getThreadNum() {
                        return this.threadNum;
                }
@@ -131,17 +127,17 @@ public interface FileIOChannel {
                                return false;
                        }
                }
-               
+
                @Override
                public int hashCode() {
                        return path.hashCode();
                }
-               
+
                @Override
                public String toString() {
                        return path.getAbsolutePath();
                }
-               
+
                private static String randomString(Random random) {
                        byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
                        random.nextBytes(bytes);
@@ -152,7 +148,7 @@ public interface FileIOChannel {
        /**
         * An enumerator for channels that logically belong together.
         */
-       public static final class Enumerator {
+       final class Enumerator {
 
                private static AtomicInteger globalCounter = new 
AtomicInteger();
 
@@ -162,7 +158,7 @@ public interface FileIOChannel {
 
                private int localCounter;
 
-               protected Enumerator(File[] basePaths, Random random) {
+               public Enumerator(File[] basePaths, Random random) {
                        this.paths = basePaths;
                        this.namePrefix = ID.randomString(random);
                        this.localCounter = 0;
@@ -177,4 +173,4 @@ public interface FileIOChannel {
                        return new ID(new File(paths[threadNum], filename), 
threadNum);
                }
        }
-}
\ No newline at end of file
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 0aaadf0..6723597 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.FileUtils;
 
@@ -36,10 +38,9 @@ import java.util.concurrent.LinkedBlockingQueue;
  * The facade for the provided I/O manager services.
  */
 public abstract class IOManager {
-       /** Logging */
        protected static final Logger LOG = 
LoggerFactory.getLogger(IOManager.class);
 
-       /** The temporary directories for files */
+       /** The temporary directories for files. */
        private final File[] paths;
 
        /** A random number generator for the anonymous ChannelIDs. */
@@ -120,41 +121,40 @@ public abstract class IOManager {
        // 
------------------------------------------------------------------------
 
        /**
-        * Creates a new {@link FileIOChannel.ID} in one of the temp 
directories. Multiple
-        * invocations of this method spread the channels evenly across the 
different directories.
+        * Creates a new {@link ID} in one of the temp directories. Multiple 
invocations of this
+        * method spread the channels evenly across the different directories.
         *
         * @return A channel to a temporary directory.
         */
-       public FileIOChannel.ID createChannel() {
+       public ID createChannel() {
                final int num = getNextPathNum();
-               return new FileIOChannel.ID(this.paths[num], num, this.random);
+               return new ID(this.paths[num], num, this.random);
        }
 
        /**
-        * Creates a new {@link FileIOChannel.Enumerator}, spreading the 
channels in a round-robin fashion
+        * Creates a new {@link Enumerator}, spreading the channels in a 
round-robin fashion
         * across the temporary file directories.
         *
         * @return An enumerator for channels.
         */
-       public FileIOChannel.Enumerator createChannelEnumerator() {
-               return new FileIOChannel.Enumerator(this.paths, this.random);
+       public Enumerator createChannelEnumerator() {
+               return new Enumerator(this.paths, this.random);
        }
 
        /**
         * Deletes the file underlying the given channel. If the channel is 
still open, this
         * call may fail.
-        * 
+        *
         * @param channel The channel to be deleted.
-        * @throws IOException Thrown if the deletion fails.
         */
-       public void deleteChannel(FileIOChannel.ID channel) throws IOException {
+       public static void deleteChannel(ID channel) {
                if (channel != null) {
                        if (channel.getPathFile().exists() && 
!channel.getPathFile().delete()) {
                                LOG.warn("IOManager failed to delete temporary 
file {}", channel.getPath());
                        }
                }
        }
-       
+
        // 
------------------------------------------------------------------------
        //                        Reader / Writer instantiations
        // 
------------------------------------------------------------------------
@@ -167,8 +167,8 @@ public abstract class IOManager {
         * @return A block channel writer that writes to the given channel.
         * @throws IOException Thrown, if the channel for the writer could not 
be opened.
         */
-       public BlockChannelWriter<MemorySegment> 
createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
-               return createBlockChannelWriter(channelID, new 
LinkedBlockingQueue<MemorySegment>());
+       public BlockChannelWriter<MemorySegment> createBlockChannelWriter(ID 
channelID) throws IOException {
+               return createBlockChannelWriter(channelID, new 
LinkedBlockingQueue<>());
        }
 
        /**
@@ -180,8 +180,9 @@ public abstract class IOManager {
         * @return A block channel writer that writes to the given channel.
         * @throws IOException Thrown, if the channel for the writer could not 
be opened.
         */
-       public abstract BlockChannelWriter<MemorySegment> 
createBlockChannelWriter(FileIOChannel.ID channelID,
-                               LinkedBlockingQueue<MemorySegment> returnQueue) 
throws IOException;
+       public abstract BlockChannelWriter<MemorySegment> 
createBlockChannelWriter(
+               ID channelID,
+               LinkedBlockingQueue<MemorySegment> returnQueue) throws 
IOException;
 
        /**
         * Creates a block channel writer that writes to the given channel. The 
writer calls the given callback
@@ -193,7 +194,9 @@ public abstract class IOManager {
         * @return A block channel writer that writes to the given channel.
         * @throws IOException Thrown, if the channel for the writer could not 
be opened.
         */
-       public abstract BlockChannelWriterWithCallback<MemorySegment> 
createBlockChannelWriter(FileIOChannel.ID channelID, 
RequestDoneCallback<MemorySegment> callback) throws IOException;
+       public abstract BlockChannelWriterWithCallback<MemorySegment> 
createBlockChannelWriter(
+               ID channelID,
+               RequestDoneCallback<MemorySegment> callback) throws IOException;
 
        /**
         * Creates a block channel reader that reads blocks from the given 
channel. The reader pushed
@@ -204,8 +207,8 @@ public abstract class IOManager {
         * @return A block channel reader that reads from the given channel.
         * @throws IOException Thrown, if the channel for the reader could not 
be opened.
         */
-       public BlockChannelReader<MemorySegment> 
createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
-               return createBlockChannelReader(channelID, new 
LinkedBlockingQueue<MemorySegment>());
+       public BlockChannelReader<MemorySegment> createBlockChannelReader(ID 
channelID) throws IOException {
+               return createBlockChannelReader(channelID, new 
LinkedBlockingQueue<>());
        }
 
        /**
@@ -217,22 +220,27 @@ public abstract class IOManager {
         * @return A block channel reader that reads from the given channel.
         * @throws IOException Thrown, if the channel for the reader could not 
be opened.
         */
-       public abstract BlockChannelReader<MemorySegment> 
createBlockChannelReader(FileIOChannel.ID channelID,
-                                                                               
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
+       public abstract BlockChannelReader<MemorySegment> 
createBlockChannelReader(
+               ID channelID,
+               LinkedBlockingQueue<MemorySegment> returnQueue) throws 
IOException;
 
-       public abstract BufferFileWriter 
createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
+       public abstract BufferFileWriter createBufferFileWriter(ID channelID) 
throws IOException;
 
-       public abstract BufferFileReader 
createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> 
callback) throws IOException;
+       public abstract BufferFileReader createBufferFileReader(
+               ID channelID,
+               RequestDoneCallback<Buffer> callback) throws IOException;
 
-       public abstract BufferFileSegmentReader 
createBufferFileSegmentReader(FileIOChannel.ID channelID, 
RequestDoneCallback<FileSegment> callback) throws IOException;
+       public abstract BufferFileSegmentReader createBufferFileSegmentReader(
+               ID channelID,
+               RequestDoneCallback<FileSegment> callback) throws IOException;
 
        /**
         * Creates a block channel reader that reads all blocks from the given 
channel directly in one bulk.
         * The reader draws segments to read the blocks into from a supplied 
list, which must contain as many
         * segments as the channel has blocks. After the reader is done, the 
list with the full segments can be
         * obtained from the reader.
-        * <p>
-        * If a channel is not to be read in one bulk, but in multiple smaller 
batches, a
+        *
+        * <p>If a channel is not to be read in one bulk, but in multiple 
smaller batches, a
         * {@link BlockChannelReader} should be used.
         *
         * @param channelID The descriptor for the channel to write to.
@@ -241,26 +249,19 @@ public abstract class IOManager {
         * @return A block channel reader that reads from the given channel.
         * @throws IOException Thrown, if the channel for the reader could not 
be opened.
         */
-       public abstract BulkBlockChannelReader 
createBulkBlockChannelReader(FileIOChannel.ID channelID,
-                       List<MemorySegment> targetSegments, int numBlocks) 
throws IOException;
+       public abstract BulkBlockChannelReader createBulkBlockChannelReader(
+               ID channelID,
+               List<MemorySegment> targetSegments,
+               int numBlocks) throws IOException;
 
 
        // 
------------------------------------------------------------------------
        //                          Utilities
        // 
------------------------------------------------------------------------
-       
-       /**
-        * Gets the number of directories across which the I/O manager rotates 
its files.
-        * 
-        * @return The number of temporary file directories.
-        */
-       public int getNumberOfSpillingDirectories() {
-               return this.paths.length;
-       }
 
        /**
         * Gets the directories that the I/O manager spills to.
-        * 
+        *
         * @return The directories that the I/O manager spills to.
         */
        public File[] getSpillingDirectories() {
@@ -279,8 +280,8 @@ public abstract class IOManager {
                }
                return strings;
        }
-       
-       protected int getNextPathNum() {
+
+       private int getNextPathNum() {
                final int next = this.nextPath;
                final int newNext = next + 1;
                this.nextPath = newNext >= this.paths.length ? 0 : newNext;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
deleted file mode 100644
index 363e02b..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-
-import java.io.IOException;
-
-/**
- * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances 
which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method.
- *
- * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually!
- */
-public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync {
-       @Override
-       public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
-                       throws IOException {
-               return new NoOpAsynchronousBufferFileWriter(channelID, 
getWriteRequestQueue(channelID));
-       }
-
-       /**
-        * {@link BufferFileWriter} subclass with a no-op in {@link 
#writeBlock(Buffer)}.
-        */
-       private static class NoOpAsynchronousBufferFileWriter extends 
AsynchronousBufferFileWriter {
-
-               private NoOpAsynchronousBufferFileWriter(
-                               ID channelID,
-                               RequestQueue<WriteRequest> requestQueue) throws 
IOException {
-                       super(channelID, requestQueue);
-               }
-
-               @Override
-               public void writeBlock(Buffer buffer) throws IOException {
-                       // do nothing
-               }
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
index 60d17bf..8f9e4dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.runtime.operators.sort;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,10 +40,20 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.MutableObjectIterator;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class LargeRecordHandlerITCase extends TestLogger {
 
        @Test
@@ -262,9 +262,7 @@ public class LargeRecordHandlerITCase extends TestLogger {
                }
                finally {
                        if (channel != null) {
-                               try {
-                                       ioMan.deleteChannel(channel);
-                               } catch (IOException ignored) {}
+                               ioMan.deleteChannel(channel);
                        }
 
                        ioMan.shutdown();

Reply via email to