This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cdbfb82ef3eaa242abf6d070463c0895ac244ef1 Author: Zhijiang <[email protected]> AuthorDate: Wed Jun 19 12:36:54 2019 +0800 [hotfix][runtime] Cleanup IOManager code --- .../runtime/io/disk/iomanager/FileIOChannel.java | 78 ++++++++++---------- .../flink/runtime/io/disk/iomanager/IOManager.java | 83 +++++++++++----------- .../IOManagerAsyncWithNoOpBufferFileWriter.java | 53 -------------- .../operators/sort/LargeRecordHandlerITCase.java | 26 ++++--- 4 files changed, 91 insertions(+), 149 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java index fd8e8e6..ef57e03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java @@ -18,93 +18,90 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.flink.util.StringUtils; + import java.io.File; import java.io.IOException; import java.nio.channels.FileChannel; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.flink.util.StringUtils; - /** * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of * files that contain sorted runs of data from the same stream, that will later on be merged together. */ public interface FileIOChannel { - + /** * Gets the channel ID of this I/O channel. - * + * * @return The channel ID. */ - FileIOChannel.ID getChannelID(); - + ID getChannelID(); + /** * Gets the size (in bytes) of the file underlying the channel. - * - * @return The size (in bytes) of the file underlying the channel. */ long getSize() throws IOException; - + /** * Checks whether the channel has been closed. - * + * * @return True if the channel has been closed, false otherwise. */ boolean isClosed(); /** - * Closes the channel. For asynchronous implementations, this method waits until all pending requests are - * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed. - * - * @throws IOException Thrown, if an error occurred while waiting for pending requests. - */ + * Closes the channel. For asynchronous implementations, this method waits until all pending requests are + * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed. + * + * @throws IOException Thrown, if an error occurred while waiting for pending requests. + */ void close() throws IOException; /** * Deletes the file underlying this I/O channel. - * + * * @throws IllegalStateException Thrown, when the channel is still open. */ void deleteChannel(); - - /** - * Closes the channel and deletes the underlying file. - * For asynchronous implementations, this method waits until all pending requests are handled; - * - * @throws IOException Thrown, if an error occurred while waiting for pending requests. - */ - public void closeAndDelete() throws IOException; FileChannel getNioFileChannel(); - + + /** + * Closes the channel and deletes the underlying file. For asynchronous implementations, + * this method waits until all pending requests are handled. + * + * @throws IOException Thrown, if an error occurred while waiting for pending requests. + */ + void closeAndDelete() throws IOException; + // -------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------- - + /** * An ID identifying an underlying file channel. */ - public static class ID { - + class ID { + private static final int RANDOM_BYTES_LENGTH = 16; - + private final File path; - + private final int threadNum; - protected ID(File path, int threadNum) { + private ID(File path, int threadNum) { this.path = path; this.threadNum = threadNum; } - protected ID(File basePath, int threadNum, Random random) { + public ID(File basePath, int threadNum, Random random) { this.path = new File(basePath, randomString(random) + ".channel"); this.threadNum = threadNum; } /** * Returns the path to the underlying temporary file. - * @return The path to the underlying temporary file.. */ public String getPath() { return path.getAbsolutePath(); @@ -112,12 +109,11 @@ public interface FileIOChannel { /** * Returns the path to the underlying temporary file as a File. - * @return The path to the underlying temporary file as a File. */ public File getPathFile() { return path; } - + int getThreadNum() { return this.threadNum; } @@ -131,17 +127,17 @@ public interface FileIOChannel { return false; } } - + @Override public int hashCode() { return path.hashCode(); } - + @Override public String toString() { return path.getAbsolutePath(); } - + private static String randomString(Random random) { byte[] bytes = new byte[RANDOM_BYTES_LENGTH]; random.nextBytes(bytes); @@ -152,7 +148,7 @@ public interface FileIOChannel { /** * An enumerator for channels that logically belong together. */ - public static final class Enumerator { + final class Enumerator { private static AtomicInteger globalCounter = new AtomicInteger(); @@ -162,7 +158,7 @@ public interface FileIOChannel { private int localCounter; - protected Enumerator(File[] basePaths, Random random) { + public Enumerator(File[] basePaths, Random random) { this.paths = basePaths; this.namePrefix = ID.randomString(random); this.localCounter = 0; @@ -177,4 +173,4 @@ public interface FileIOChannel { return new ID(new File(paths[threadNum], filename), threadNum); } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 0aaadf0..6723597 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.io.disk.iomanager; import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.Enumerator; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.util.FileUtils; @@ -36,10 +38,9 @@ import java.util.concurrent.LinkedBlockingQueue; * The facade for the provided I/O manager services. */ public abstract class IOManager { - /** Logging */ protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); - /** The temporary directories for files */ + /** The temporary directories for files. */ private final File[] paths; /** A random number generator for the anonymous ChannelIDs. */ @@ -120,41 +121,40 @@ public abstract class IOManager { // ------------------------------------------------------------------------ /** - * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple - * invocations of this method spread the channels evenly across the different directories. + * Creates a new {@link ID} in one of the temp directories. Multiple invocations of this + * method spread the channels evenly across the different directories. * * @return A channel to a temporary directory. */ - public FileIOChannel.ID createChannel() { + public ID createChannel() { final int num = getNextPathNum(); - return new FileIOChannel.ID(this.paths[num], num, this.random); + return new ID(this.paths[num], num, this.random); } /** - * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion + * Creates a new {@link Enumerator}, spreading the channels in a round-robin fashion * across the temporary file directories. * * @return An enumerator for channels. */ - public FileIOChannel.Enumerator createChannelEnumerator() { - return new FileIOChannel.Enumerator(this.paths, this.random); + public Enumerator createChannelEnumerator() { + return new Enumerator(this.paths, this.random); } /** * Deletes the file underlying the given channel. If the channel is still open, this * call may fail. - * + * * @param channel The channel to be deleted. - * @throws IOException Thrown if the deletion fails. */ - public void deleteChannel(FileIOChannel.ID channel) throws IOException { + public static void deleteChannel(ID channel) { if (channel != null) { if (channel.getPathFile().exists() && !channel.getPathFile().delete()) { LOG.warn("IOManager failed to delete temporary file {}", channel.getPath()); } } } - + // ------------------------------------------------------------------------ // Reader / Writer instantiations // ------------------------------------------------------------------------ @@ -167,8 +167,8 @@ public abstract class IOManager { * @return A block channel writer that writes to the given channel. * @throws IOException Thrown, if the channel for the writer could not be opened. */ - public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException { - return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>()); + public BlockChannelWriter<MemorySegment> createBlockChannelWriter(ID channelID) throws IOException { + return createBlockChannelWriter(channelID, new LinkedBlockingQueue<>()); } /** @@ -180,8 +180,9 @@ public abstract class IOManager { * @return A block channel writer that writes to the given channel. * @throws IOException Thrown, if the channel for the writer could not be opened. */ - public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, - LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException; + public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter( + ID channelID, + LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException; /** * Creates a block channel writer that writes to the given channel. The writer calls the given callback @@ -193,7 +194,9 @@ public abstract class IOManager { * @return A block channel writer that writes to the given channel. * @throws IOException Thrown, if the channel for the writer could not be opened. */ - public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException; + public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter( + ID channelID, + RequestDoneCallback<MemorySegment> callback) throws IOException; /** * Creates a block channel reader that reads blocks from the given channel. The reader pushed @@ -204,8 +207,8 @@ public abstract class IOManager { * @return A block channel reader that reads from the given channel. * @throws IOException Thrown, if the channel for the reader could not be opened. */ - public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID) throws IOException { - return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>()); + public BlockChannelReader<MemorySegment> createBlockChannelReader(ID channelID) throws IOException { + return createBlockChannelReader(channelID, new LinkedBlockingQueue<>()); } /** @@ -217,22 +220,27 @@ public abstract class IOManager { * @return A block channel reader that reads from the given channel. * @throws IOException Thrown, if the channel for the reader could not be opened. */ - public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID, - LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException; + public abstract BlockChannelReader<MemorySegment> createBlockChannelReader( + ID channelID, + LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException; - public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException; + public abstract BufferFileWriter createBufferFileWriter(ID channelID) throws IOException; - public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException; + public abstract BufferFileReader createBufferFileReader( + ID channelID, + RequestDoneCallback<Buffer> callback) throws IOException; - public abstract BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException; + public abstract BufferFileSegmentReader createBufferFileSegmentReader( + ID channelID, + RequestDoneCallback<FileSegment> callback) throws IOException; /** * Creates a block channel reader that reads all blocks from the given channel directly in one bulk. * The reader draws segments to read the blocks into from a supplied list, which must contain as many * segments as the channel has blocks. After the reader is done, the list with the full segments can be * obtained from the reader. - * <p> - * If a channel is not to be read in one bulk, but in multiple smaller batches, a + * + * <p>If a channel is not to be read in one bulk, but in multiple smaller batches, a * {@link BlockChannelReader} should be used. * * @param channelID The descriptor for the channel to write to. @@ -241,26 +249,19 @@ public abstract class IOManager { * @return A block channel reader that reads from the given channel. * @throws IOException Thrown, if the channel for the reader could not be opened. */ - public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, - List<MemorySegment> targetSegments, int numBlocks) throws IOException; + public abstract BulkBlockChannelReader createBulkBlockChannelReader( + ID channelID, + List<MemorySegment> targetSegments, + int numBlocks) throws IOException; // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ - - /** - * Gets the number of directories across which the I/O manager rotates its files. - * - * @return The number of temporary file directories. - */ - public int getNumberOfSpillingDirectories() { - return this.paths.length; - } /** * Gets the directories that the I/O manager spills to. - * + * * @return The directories that the I/O manager spills to. */ public File[] getSpillingDirectories() { @@ -279,8 +280,8 @@ public abstract class IOManager { } return strings; } - - protected int getNextPathNum() { + + private int getNextPathNum() { final int next = this.nextPath; final int newNext = next + 1; this.nextPath = newNext >= this.paths.length ? 0 : newNext; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java deleted file mode 100644 index 363e02b..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncWithNoOpBufferFileWriter.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.disk.iomanager; - -import org.apache.flink.runtime.io.network.buffer.Buffer; - -import java.io.IOException; - -/** - * An {@link IOManagerAsync} that creates {@link BufferFileWriter} instances which do nothing in their {@link BufferFileWriter#writeBlock(Object)} method. - * - * <p>Beware: the passed {@link Buffer} instances must be cleaned up manually! - */ -public class IOManagerAsyncWithNoOpBufferFileWriter extends IOManagerAsync { - @Override - public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) - throws IOException { - return new NoOpAsynchronousBufferFileWriter(channelID, getWriteRequestQueue(channelID)); - } - - /** - * {@link BufferFileWriter} subclass with a no-op in {@link #writeBlock(Buffer)}. - */ - private static class NoOpAsynchronousBufferFileWriter extends AsynchronousBufferFileWriter { - - private NoOpAsynchronousBufferFileWriter( - ID channelID, - RequestQueue<WriteRequest> requestQueue) throws IOException { - super(channelID, requestQueue); - } - - @Override - public void writeBlock(Buffer buffer) throws IOException { - // do nothing - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java index 60d17bf..8f9e4dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/LargeRecordHandlerITCase.java @@ -18,16 +18,6 @@ package org.apache.flink.runtime.operators.sort; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -50,10 +40,20 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.types.Value; import org.apache.flink.util.MutableObjectIterator; - import org.apache.flink.util.TestLogger; + import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class LargeRecordHandlerITCase extends TestLogger { @Test @@ -262,9 +262,7 @@ public class LargeRecordHandlerITCase extends TestLogger { } finally { if (channel != null) { - try { - ioMan.deleteChannel(channel); - } catch (IOException ignored) {} + ioMan.deleteChannel(channel); } ioMan.shutdown();
