[FLINK-1296] [runtime] Add better paged disk I/O readers / writers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/996d404c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/996d404c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/996d404c Branch: refs/heads/master Commit: 996d404ced9347aaa00de4356c07333d46322eae Parents: 7610588 Author: Stephan Ewen <[email protected]> Authored: Mon Dec 1 20:21:25 2014 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 21 12:01:35 2015 +0100 ---------------------------------------------------------------------- .../runtime/io/disk/FileChannelInputView.java | 148 ++++++++ .../runtime/io/disk/FileChannelOutputView.java | 144 ++++++++ .../io/disk/SeekableFileChannelInputView.java | 186 ++++++++++ .../disk/iomanager/AbstractFileIOChannel.java | 13 +- .../disk/iomanager/AsynchronousBlockReader.java | 7 +- .../iomanager/AsynchronousFileIOChannel.java | 18 +- .../io/disk/iomanager/BlockChannelReader.java | 7 + .../io/disk/iomanager/FileIOChannel.java | 9 +- .../io/disk/iomanager/IOManagerAsync.java | 14 +- .../runtime/memorymanager/MemoryManager.java | 3 +- .../io/disk/FileChannelStreamsITCase.java | 307 ++++++++++++++++ .../runtime/io/disk/FileChannelStreamsTest.java | 119 ++++++ .../disk/SeekableFileChannelInputViewTest.java | 157 ++++++++ .../AsynchronousFileIOChannelsTest.java | 175 +++++++++ .../io/disk/iomanager/IOManagerAsyncTest.java | 359 +++++++++++++++++++ .../io/disk/iomanager/IOManagerITCase.java | 66 +--- .../io/disk/iomanager/IOManagerTest.java | 237 +++--------- .../operators/testutils/PairGenerator.java | 161 +++++++++ 18 files changed, 1872 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java new file mode 100644 index 0000000..9fb8072 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.memorymanager.AbstractPagedInputView; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.util.MathUtils; + +/** + * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link BlockChannelReader}, + * making it effectively a data input stream. The view reads it data in blocks from the underlying channel. + * The view can read data that has been written by a {@link FileChannelOutputView}, or that was written in blocks + * in another fashion. + */ +public class FileChannelInputView extends AbstractPagedInputView { + + private final BlockChannelReader reader; + + private final MemoryManager memManager; + + private final List<MemorySegment> memory; + + private final int sizeOfLastBlock; + + private int numRequestsRemaining; + + private int numBlocksRemaining; + + // -------------------------------------------------------------------------------------------- + + public FileChannelInputView(BlockChannelReader reader, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException { + super(0); + + checkNotNull(reader); + checkNotNull(memManager); + checkNotNull(memory); + checkArgument(!reader.isClosed()); + checkArgument(memory.size() > 0); + + this.reader = reader; + this.memManager = memManager; + this.memory = memory; + this.sizeOfLastBlock = sizeOfLastBlock; + + try { + final long channelLength = reader.getSize(); + final int segmentSize = memManager.getPageSize(); + + this.numBlocksRemaining = MathUtils.checkedDownCast(channelLength / segmentSize); + if (channelLength % segmentSize != 0) { + this.numBlocksRemaining++; + } + + this.numRequestsRemaining = numBlocksRemaining; + + for (int i = 0; i < memory.size(); i++) { + sendReadRequest(memory.get(i)); + } + + advance(); + } + catch (IOException e) { + memManager.release(memory); + throw e; + } + } + + public void close() throws IOException { + close(false); + } + + public void closeAndDelete() throws IOException { + close(true); + } + + private void close(boolean deleteFile) throws IOException { + try { + clear(); + if (deleteFile) { + reader.closeAndDelete(); + } else { + reader.close(); + } + } finally { + synchronized (memory) { + memManager.release(memory); + memory.clear(); + } + } + } + + @Override + protected MemorySegment nextSegment(MemorySegment current) throws IOException { + // check for end-of-stream + if (numBlocksRemaining <= 0) { + reader.close(); + throw new EOFException(); + } + + // send a request first. if we have only a single segment, this same segment will be the one obtained in the next lines + if (current != null) { + sendReadRequest(current); + } + + // get the next segment + numBlocksRemaining--; + return reader.getNextReturnedSegment(); + } + + @Override + protected int getLimitForSegment(MemorySegment segment) { + return numBlocksRemaining > 0 ? segment.size() : sizeOfLastBlock; + } + + private void sendReadRequest(MemorySegment seg) throws IOException { + if (numRequestsRemaining > 0) { + reader.readBlock(seg); + numRequestsRemaining--; + } else { + memManager.release(seg); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java new file mode 100644 index 0000000..2b8b728 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.List; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView; +import org.apache.flink.runtime.memorymanager.MemoryManager; + +/** + * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link BlockChannelWriter}, making it effectively a data output + * stream. The view writes it data in blocks to the underlying channel. + */ +public class FileChannelOutputView extends AbstractPagedOutputView { + + private final BlockChannelWriter writer; // the writer to the channel + + private final MemoryManager memManager; + + private final List<MemorySegment> memory; + + private int numBlocksWritten; + + private int bytesInLatestSegment; + + // -------------------------------------------------------------------------------------------- + + public FileChannelOutputView(BlockChannelWriter writer, MemoryManager memManager, List<MemorySegment> memory, int segmentSize) throws IOException { + super(segmentSize, 0); + + checkNotNull(writer); + checkNotNull(memManager); + checkNotNull(memory); + checkArgument(!writer.isClosed()); + + this.writer = writer; + this.memManager = memManager; + this.memory = memory; + + + for (MemorySegment next : memory) { + writer.getReturnQueue().add(next); + } + + // move to the first page + advance(); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Closes this output, writing pending data and releasing the memory. + * + * @throws IOException Thrown, if the pending data could not be written. + */ + public void close() throws IOException { + close(false); + } + + /** + * Closes this output, writing pending data and releasing the memory. + * + * @throws IOException Thrown, if the pending data could not be written. + */ + public void closeAndDelete() throws IOException { + close(true); + } + + private void close(boolean delete) throws IOException { + try { + // send off set last segment, if we have not been closed before + MemorySegment current = getCurrentSegment(); + if (current != null) { + writeSegment(current, getCurrentPositionInSegment()); + } + + clear(); + if (delete) { + writer.closeAndDelete(); + } else { + writer.close(); + } + } + finally { + memManager.release(memory); + } + } + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the number of blocks written by this output view. + * + * @return The number of blocks written by this output view. + */ + public int getBlockCount() { + return numBlocksWritten; + } + + /** + * Gets the number of bytes written in the latest memory segment. + * + * @return The number of bytes written in the latest memory segment. + */ + public int getBytesInLatestSegment() { + return bytesInLatestSegment; + } + + @Override + protected MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException { + if (current != null) { + writeSegment(current, posInSegment); + } + return writer.getNextReturnedSegment(); + } + + private void writeSegment(MemorySegment segment, int writePosition) throws IOException { + writer.writeBlock(segment); + numBlocksWritten++; + bytesInLatestSegment = writePosition; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java new file mode 100644 index 0000000..e97a1ff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memorymanager.AbstractPagedInputView; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.util.MathUtils; + +/** + * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link BlockChannelReader}, + * making it effectively a data input stream. The view reads it data in blocks from the underlying channel. + * The view can read data that has been written by a {@link FileChannelOutputView}, or that was written in blocks + * in another fashion. + */ +public class SeekableFileChannelInputView extends AbstractPagedInputView { + + private BlockChannelReader reader; + + private final IOManager ioManager; + + private final FileIOChannel.ID channelId; + + private final MemoryManager memManager; + + private final List<MemorySegment> memory; + + private final int sizeOfLastBlock; + + private final int numBlocksTotal; + + private final int segmentSize; + + private int numRequestsRemaining; + + private int numBlocksRemaining; + + // -------------------------------------------------------------------------------------------- + + public SeekableFileChannelInputView(IOManager ioManager, FileIOChannel.ID channelId, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException { + super(0); + + checkNotNull(ioManager); + checkNotNull(channelId); + checkNotNull(memManager); + checkNotNull(memory); + + this.ioManager = ioManager; + this.channelId = channelId; + this.memManager = memManager; + this.memory = memory; + this.sizeOfLastBlock = sizeOfLastBlock; + this.segmentSize = memManager.getPageSize(); + + this.reader = ioManager.createBlockChannelReader(channelId); + + try { + final long channelLength = reader.getSize(); + + final int blockCount = MathUtils.checkedDownCast(channelLength / segmentSize); + this.numBlocksTotal = (channelLength % segmentSize == 0) ? blockCount : blockCount + 1; + + this.numBlocksRemaining = this.numBlocksTotal; + this.numRequestsRemaining = numBlocksRemaining; + + for (int i = 0; i < memory.size(); i++) { + sendReadRequest(memory.get(i)); + } + + advance(); + } + catch (IOException e) { + memManager.release(memory); + throw e; + } + } + + public void seek(long position) throws IOException { + final int block = MathUtils.checkedDownCast(position / segmentSize); + final int positionInBlock = (int) (position % segmentSize); + + if (position < 0 || block >= numBlocksTotal || (block == numBlocksTotal - 1 && positionInBlock > sizeOfLastBlock)) { + throw new IllegalArgumentException("Position is out of range"); + } + + clear(); + if (reader != null) { + reader.close(); + } + + reader = ioManager.createBlockChannelReader(channelId); + + if (block > 0) { + reader.seekToPosition(block * segmentSize); + } + + this.numBlocksRemaining = this.numBlocksTotal - block; + this.numRequestsRemaining = numBlocksRemaining; + + for (int i = 0; i < memory.size(); i++) { + sendReadRequest(memory.get(i)); + } + + numBlocksRemaining--; + seekInput(reader.getNextReturnedSegment(), positionInBlock, numBlocksRemaining == 0 ? sizeOfLastBlock : segmentSize); + } + + public void close() throws IOException { + close(false); + } + + public void closeAndDelete() throws IOException { + close(true); + } + + private void close(boolean deleteFile) throws IOException { + try { + clear(); + if (deleteFile) { + reader.closeAndDelete(); + } else { + reader.close(); + } + } finally { + synchronized (memory) { + memManager.release(memory); + memory.clear(); + } + } + } + + @Override + protected MemorySegment nextSegment(MemorySegment current) throws IOException { + // check for end-of-stream + if (numBlocksRemaining <= 0) { + reader.close(); + throw new EOFException(); + } + + // send a request first. if we have only a single segment, this same segment will be the one obtained in the next lines + if (current != null) { + sendReadRequest(current); + } + + // get the next segment + numBlocksRemaining--; + return reader.getNextReturnedSegment(); + } + + @Override + protected int getLimitForSegment(MemorySegment segment) { + return numBlocksRemaining > 0 ? segment.size() : sizeOfLastBlock; + } + + private void sendReadRequest(MemorySegment seg) throws IOException { + if (numRequestsRemaining > 0) { + reader.readBlock(seg); + numRequestsRemaining--; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java index ecb794e..3991167 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java @@ -64,17 +64,18 @@ public abstract class AbstractFileIOChannel implements FileIOChannel { // -------------------------------------------------------------------------------------------- - /** - * Gets the channel ID of this channel. - * - * @return This channel's ID. - */ @Override public final FileIOChannel.ID getChannelID() { return this.id; } @Override + public long getSize() throws IOException { + FileChannel channel = fileChannel; + return channel == null ? 0 : channel.size(); + } + + @Override public abstract boolean isClosed(); @Override @@ -103,4 +104,4 @@ public abstract class AbstractFileIOChannel implements FileIOChannel { deleteChannel(); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java index a15acb5..acfa71f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java @@ -115,4 +115,9 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg public LinkedBlockingQueue<MemorySegment> getReturnQueue() { return this.returnSegments; } -} + + @Override + public void seekToPosition(long position) throws IOException { + this.requestQueue.add(new SeekRequest(this, position)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java index 89ebb25..9a9ee61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java @@ -72,7 +72,7 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends * @throws IOException Thrown, if the channel could no be opened. */ protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue, - RequestDoneCallback<T> callback, boolean writeEnabled) throws IOException + RequestDoneCallback callback, boolean writeEnabled) throws IOException { super(channelID, writeEnabled); @@ -113,7 +113,9 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends this.closeLock.wait(1000); checkErroneous(); } - catch (InterruptedException ignored) {} + catch (InterruptedException iex) { + throw new IOException("Closing of asynchronous file channel was interrupted."); + } } } finally { @@ -181,13 +183,11 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends } } finally { - // decrement the number of missing buffers. If we are currently closing, notify the - if (this.closed) { - synchronized (this.closeLock) { - int num = this.requestsNotReturned.decrementAndGet(); - if (num == 0) { - this.closeLock.notifyAll(); - } + // decrement the number of missing buffers. If we are currently closing, notify the waiters + synchronized (this.closeLock) { + final int num = this.requestsNotReturned.decrementAndGet(); + if (this.closed && num == 0) { + this.closeLock.notifyAll(); } } else { http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java index f25827a..8f7f218 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java @@ -60,5 +60,12 @@ public interface BlockChannelReader extends FileIOChannel { * @return The queue with the full memory segments. */ LinkedBlockingQueue<MemorySegment> getReturnQueue(); + + /** + * Seeks the underlying file channel to the given position. + * + * @param position The position to seek to. + */ + void seekToPosition(long position) throws IOException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java index 7c9d31b..d6f4458 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java @@ -38,6 +38,13 @@ public interface FileIOChannel { FileIOChannel.ID getChannelID(); /** + * Gets the size (in bytes) of the file underlying the channel. + * + * @return The size (in bytes) of the file underlying the channel. + */ + long getSize() throws IOException; + + /** * Checks whether the channel has been closed. * * @return True if the channel has been closed, false otherwise. @@ -153,4 +160,4 @@ public interface FileIOChannel { return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index 7de8651..6489396 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -231,6 +231,18 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle checkState(!shutdown, "I/O-Manger is closed."); return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks); } + + // ------------------------------------------------------------------------- + // For Testing + // ------------------------------------------------------------------------- + + RequestQueue<ReadRequest> getReadRequestQueue(FileIOChannel.ID channelID) { + return this.readers[channelID.getThreadNum()].requestQueue; + } + + RequestQueue<WriteRequest> getWriteRequestQueue(FileIOChannel.ID channelID) { + return this.writers[channelID.getThreadNum()].requestQueue; + } // ------------------------------------------------------------------------- // I/O Worker Threads @@ -446,4 +458,4 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle } }; // end writer thread -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java index 875b223..1ab6931 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java @@ -119,8 +119,7 @@ public interface MemoryManager { void shutdown(); /** - * Checks if the memory manager all memory available and the descriptors of the free segments - * describe a contiguous memory layout. + * Checks if the memory manager all memory available. * * @return True, if the memory manager is empty and valid, false if it is not empty or corrupted. */ http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java new file mode 100644 index 0000000..27928a9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.PairGenerator; +import org.apache.flink.runtime.operators.testutils.PairGenerator.KeyMode; +import org.apache.flink.runtime.operators.testutils.PairGenerator.Pair; +import org.apache.flink.runtime.operators.testutils.PairGenerator.ValueMode; + +import java.io.EOFException; +import java.util.List; + +public class FileChannelStreamsITCase { + + private static final long SEED = 649180756312423613L; + + private static final int KEY_MAX = Integer.MAX_VALUE; + + private static final int VALUE_SHORT_LENGTH = 114; + + private static final int VALUE_LONG_LENGTH = 112 * 1024; + + private static final int NUM_PAIRS_SHORT = 1000000; + + private static final int NUM_PAIRS_LONG = 3000; + + private static final int MEMORY_PAGE_SIZE = 32 * 1024; + + private static final int NUM_MEMORY_SEGMENTS = 3; + + private IOManager ioManager; + + private MemoryManager memManager; + + // -------------------------------------------------------------------------------------------- + + @Before + public void beforeTest() { + memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE); + ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + ioManager.shutdown(); + assertTrue("I/O Manager was not properly shut down.", ioManager.isProperlyShutDown()); + assertTrue("The memory has not been properly released", memManager.verifyEmpty()); + } + + // -------------------------------------------------------------------------------------------- + + @Test + public void testWriteReadSmallRecords() { + try { + List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + final FileIOChannel.ID channel = ioManager.createChannel(); + + // create the writer output view + final BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel); + final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE); + + // write a number of pairs + Pair pair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT; i++) { + generator.next(pair); + pair.write(outView); + } + outView.close(); + + // create the reader input view + List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final BlockChannelReader reader = ioManager.createBlockChannelReader(channel); + final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment()); + generator.reset(); + + // read and re-generate all records and compare them + Pair readPair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT; i++) { + generator.next(pair); + readPair.read(inView); + assertEquals("The re-generated and the read record do not match.", pair, readPair); + } + + inView.close(); + reader.deleteChannel(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWriteAndReadLongRecords() { + try { + final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + final FileIOChannel.ID channel = this.ioManager.createChannel(); + + // create the writer output view + final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel); + final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE); + + // write a number of pairs + Pair pair = new Pair(); + for (int i = 0; i < NUM_PAIRS_LONG; i++) { + generator.next(pair); + pair.write(outView); + } + outView.close(); + + // create the reader input view + List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final BlockChannelReader reader = ioManager.createBlockChannelReader(channel); + final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment()); + generator.reset(); + + // read and re-generate all records and compare them + Pair readPair = new Pair(); + for (int i = 0; i < NUM_PAIRS_LONG; i++) { + generator.next(pair); + readPair.read(inView); + assertEquals("The re-generated and the read record do not match.", pair, readPair); + } + + inView.close(); + reader.deleteChannel(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testReadTooMany() { + try { + final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + final FileIOChannel.ID channel = this.ioManager.createChannel(); + + // create the writer output view + final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel); + final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE); + + // write a number of pairs + Pair pair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT; i++) { + generator.next(pair); + pair.write(outView); + } + outView.close(); + + // create the reader input view + List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final BlockChannelReader reader = ioManager.createBlockChannelReader(channel); + final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment()); + generator.reset(); + + // read and re-generate all records and compare them + try { + Pair readPair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT + 1; i++) { + generator.next(pair); + readPair.read(inView); + assertEquals("The re-generated and the read record do not match.", pair, readPair); + } + fail("Expected an EOFException which did not occur."); + } + catch (EOFException eofex) { + // expected + } + + inView.close(); + reader.deleteChannel(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWriteReadOneBufferOnly() { + try { + final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), 1); + + final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + final FileIOChannel.ID channel = this.ioManager.createChannel(); + + // create the writer output view + final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel); + final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE); + + // write a number of pairs + Pair pair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT; i++) { + generator.next(pair); + pair.write(outView); + } + outView.close(); + + // create the reader input view + List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), 1); + + final BlockChannelReader reader = ioManager.createBlockChannelReader(channel); + final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment()); + generator.reset(); + + // read and re-generate all records and compare them + Pair readPair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT; i++) { + generator.next(pair); + readPair.read(inView); + assertEquals("The re-generated and the read record do not match.", pair, readPair); + } + + inView.close(); + reader.deleteChannel(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testWriteReadNotAll() { + try { + final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + final FileIOChannel.ID channel = this.ioManager.createChannel(); + + // create the writer output view + final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel); + final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE); + + // write a number of pairs + Pair pair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT; i++) { + generator.next(pair); + pair.write(outView); + } + outView.close(); + + // create the reader input view + List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS); + + final BlockChannelReader reader = ioManager.createBlockChannelReader(channel); + final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment()); + generator.reset(); + + // read and re-generate all records and compare them + Pair readPair = new Pair(); + for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) { + generator.next(pair); + readPair.read(inView); + assertEquals("The re-generated and the read record do not match.", pair, readPair); + } + + inView.close(); + reader.deleteChannel(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java new file mode 100644 index 0000000..1db2a6f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.FileWriter; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.types.StringValue; +import org.junit.Test; + + +public class FileChannelStreamsTest { + + @Test + public void testCloseAndDeleteOutputView() { + final IOManager ioManager = new IOManagerAsync(); + try { + MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024); + List<MemorySegment> memory = new ArrayList<MemorySegment>(); + memMan.allocatePages(new DummyInvokable(), memory, 4); + + FileIOChannel.ID channel = ioManager.createChannel(); + BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel); + + FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize()); + new StringValue("Some test text").write(out); + + // close for the first time, make sure all memory returns + out.close(); + assertTrue(memMan.verifyEmpty()); + + // close again, should not cause an exception + out.close(); + + // delete, make sure file is removed + out.closeAndDelete(); + assertFalse(new File(channel.getPath()).exists()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + ioManager.shutdown(); + } + } + + @Test + public void testCloseAndDeleteInputView() { + final IOManager ioManager = new IOManagerAsync(); + try { + MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024); + List<MemorySegment> memory = new ArrayList<MemorySegment>(); + memMan.allocatePages(new DummyInvokable(), memory, 4); + + FileIOChannel.ID channel = ioManager.createChannel(); + + // add some test data + { + FileWriter wrt = new FileWriter(channel.getPath()); + wrt.write("test data"); + wrt.close(); + } + + BlockChannelReader reader = ioManager.createBlockChannelReader(channel); + FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9); + + // read just something + in.readInt(); + + // close for the first time, make sure all memory returns + in.close(); + assertTrue(memMan.verifyEmpty()); + + // close again, should not cause an exception + in.close(); + + // delete, make sure file is removed + in.closeAndDelete(); + assertFalse(new File(channel.getPath()).exists()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + ioManager.shutdown(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java new file mode 100644 index 0000000..7e4d70d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk; + +import static org.junit.Assert.*; + +import java.io.EOFException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; +import org.apache.flink.runtime.memorymanager.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.junit.Test; + + +public class SeekableFileChannelInputViewTest { + + @Test + public void testSeek() { + final IOManager ioManager = new IOManagerAsync(); + final int PAGE_SIZE = 16 * 1024; + final int NUM_RECORDS = 120000; + // integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes) + + try { + MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE); + List<MemorySegment> memory = new ArrayList<MemorySegment>(); + memMan.allocatePages(new DummyInvokable(), memory, 4); + + FileIOChannel.ID channel = ioManager.createChannel(); + BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel); + FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize()); + + // write some integers across 7.5 pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes) + for (int i = 0; i < NUM_RECORDS; i += 4) { + out.writeInt(i); + } + // close for the first time, make sure all memory returns + out.close(); + assertTrue(memMan.verifyEmpty()); + + memMan.allocatePages(new DummyInvokable(), memory, 4); + SeekableFileChannelInputView in = new SeekableFileChannelInputView(ioManager, channel, memMan, memory, out.getBytesInLatestSegment()); + + // read first, complete + for (int i = 0; i < NUM_RECORDS; i += 4) { + assertEquals(i, in.readInt()); + } + try { + in.readInt(); + fail("should throw EOF exception"); + } catch (EOFException e) {} + + // seek to the middle of the 3rd page + int i = 2 * PAGE_SIZE + PAGE_SIZE / 4; + in.seek(i); + for (; i < NUM_RECORDS; i += 4) { + assertEquals(i, in.readInt()); + } + try { + in.readInt(); + fail("should throw EOF exception"); + } catch (EOFException e) {} + + // seek to the end + i = 120000 - 4; + in.seek(i); + for (; i < NUM_RECORDS; i += 4) { + assertEquals(i, in.readInt()); + } + try { + in.readInt(); + fail("should throw EOF exception"); + } catch (EOFException e) {} + + // seek to the beginning + i = 0; + in.seek(i); + for (; i < NUM_RECORDS; i += 4) { + assertEquals(i, in.readInt()); + } + try { + in.readInt(); + fail("should throw EOF exception"); + } catch (EOFException e) {} + + // seek to after a page + i = PAGE_SIZE; + in.seek(i); + for (; i < NUM_RECORDS; i += 4) { + assertEquals(i, in.readInt()); + } + try { + in.readInt(); + fail("should throw EOF exception"); + } catch (EOFException e) {} + + // seek to after a page + i = 3 * PAGE_SIZE; + in.seek(i); + for (; i < NUM_RECORDS; i += 4) { + assertEquals(i, in.readInt()); + } + try { + in.readInt(); + fail("should throw EOF exception"); + } catch (EOFException e) {} + + // seek to the end + i = NUM_RECORDS; + in.seek(i); + try { + in.readInt(); + fail("should throw EOF exception"); + } catch (EOFException e) {} + + // seek out of bounds + try { + in.seek(-10); + fail("should throw an exception"); + } catch (IllegalArgumentException e) {} + try { + in.seek(NUM_RECORDS + 1); + fail("should throw an exception"); + } catch (IllegalArgumentException e) {} + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + ioManager.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java new file mode 100644 index 0000000..1e9d4d4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk.iomanager; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.flink.core.memory.MemorySegment; +import org.junit.Test; + +public class AsynchronousFileIOChannelsTest { + + @Test + public void testClosingWaits() { + IOManagerAsync ioMan = new IOManagerAsync(); + try { + + final int NUM_BLOCKS = 100; + final MemorySegment seg = new MemorySegment(new byte[32 * 1024]); + + final AtomicInteger callbackCounter = new AtomicInteger(); + final AtomicBoolean exceptionOccurred = new AtomicBoolean(); + + final RequestDoneCallback callback = new RequestDoneCallback() { + + @Override + public void requestSuccessful(MemorySegment buffer) { + // we do the non safe variant. the callbacks should come in order from + // the same thread, so it should always work + callbackCounter.set(callbackCounter.get() + 1); + + if (buffer != seg) { + exceptionOccurred.set(true); + } + } + + @Override + public void requestFailed(MemorySegment buffer, IOException e) { + exceptionOccurred.set(true); + } + }; + + BlockChannelWriterWithCallback writer = ioMan.createBlockChannelWriter(ioMan.createChannel(), callback); + try { + for (int i = 0; i < NUM_BLOCKS; i++) { + writer.writeBlock(seg); + } + + writer.close(); + + assertEquals(NUM_BLOCKS, callbackCounter.get()); + assertFalse(exceptionOccurred.get()); + } + finally { + writer.closeAndDelete(); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + ioMan.shutdown(); + } + } + + @Test + public void testExceptionForwardsToClose() { + IOManagerAsync ioMan = new IOManagerAsync(); + try { + testExceptionForwardsToClose(ioMan, 100, 1); + testExceptionForwardsToClose(ioMan, 100, 50); + testExceptionForwardsToClose(ioMan, 100, 100); + } + finally { + ioMan.shutdown(); + } + } + + private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) { + try { + MemorySegment seg = new MemorySegment(new byte[32 * 1024]); + FileIOChannel.ID channelId = ioMan.createChannel(); + + BlockChannelWriterWithCallback writer = new AsynchronousBlockWriterWithCallback(channelId, + ioMan.getWriteRequestQueue(channelId), new NoOpCallback()) { + + private int numBlocks; + + @Override + public void writeBlock(MemorySegment segment) throws IOException { + numBlocks++; + + if (numBlocks == failingBlock) { + this.requestsNotReturned.incrementAndGet(); + this.requestQueue.add(new FailingWriteRequest(this, segment)); + } else { + super.writeBlock(segment); + } + } + }; + + try { + for (int i = 0; i < numBlocks; i++) { + writer.writeBlock(seg); + } + + writer.close(); + fail("did not forward exception"); + } + catch (IOException e) { + // expected + } + finally { + try { + writer.closeAndDelete(); + } catch (Throwable t) {} + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static class NoOpCallback implements RequestDoneCallback { + + @Override + public void requestSuccessful(MemorySegment buffer) {} + + @Override + public void requestFailed(MemorySegment buffer, IOException e) {} + } + + private static class FailingWriteRequest implements WriteRequest { + + private final AsynchronousFileIOChannel<WriteRequest> channel; + + private final MemorySegment segment; + + protected FailingWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) { + this.channel = targetChannel; + this.segment = segment; + } + + @Override + public void write() throws IOException { + throw new IOException(); + } + + @Override + public void requestDone(IOException ioex) { + this.channel.handleProcessedBuffer(this.segment, ioex); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java new file mode 100644 index 0000000..297eeed --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk.iomanager; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.ReadRequest; +import org.apache.flink.runtime.io.disk.iomanager.WriteRequest; + +public class IOManagerAsyncTest { + + private IOManagerAsync ioManager; + + // ------------------------------------------------------------------------ + // Setup & Shutdown + // ------------------------------------------------------------------------ + + @Before + public void beforeTest() { + ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + this.ioManager.shutdown(); + assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); + } + + // ------------------------------------------------------------------------ + // Test Methods + // ------------------------------------------------------------------------ + + @Test + public void channelReadWriteOneSegment() { + final int NUM_IOS = 1111; + + try { + final FileIOChannel.ID channelID = this.ioManager.createChannel(); + final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID); + + MemorySegment memSeg = new MemorySegment(new byte[32 * 1024]); + + for (int i = 0; i < NUM_IOS; i++) { + for (int pos = 0; pos < memSeg.size(); pos += 4) { + memSeg.putInt(pos, i); + } + + writer.writeBlock(memSeg); + memSeg = writer.getNextReturnedSegment(); + } + + writer.close(); + + final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID); + for (int i = 0; i < NUM_IOS; i++) { + reader.readBlock(memSeg); + memSeg = reader.getNextReturnedSegment(); + + for (int pos = 0; pos < memSeg.size(); pos += 4) { + if (memSeg.getInt(pos) != i) { + fail("Read memory segment contains invalid data."); + } + } + } + + reader.closeAndDelete(); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Test encountered an exception: " + ex.getMessage()); + } + } + + @Test + public void channelReadWriteMultipleSegments() { + final int NUM_IOS = 1111; + final int NUM_SEGS = 16; + + try { + final List<MemorySegment> memSegs = new ArrayList<MemorySegment>(); + for (int i = 0; i < NUM_SEGS; i++) { + memSegs.add(new MemorySegment(new byte[32 * 1024])); + } + + final FileIOChannel.ID channelID = this.ioManager.createChannel(); + final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID); + + for (int i = 0; i < NUM_IOS; i++) { + final MemorySegment memSeg = memSegs.isEmpty() ? writer.getNextReturnedSegment() : memSegs.remove(memSegs.size() - 1); + + for (int pos = 0; pos < memSeg.size(); pos += 4) { + memSeg.putInt(pos, i); + } + + writer.writeBlock(memSeg); + } + writer.close(); + + // get back the memory + while (memSegs.size() < NUM_SEGS) { + memSegs.add(writer.getNextReturnedSegment()); + } + + final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID); + while(!memSegs.isEmpty()) { + reader.readBlock(memSegs.remove(0)); + } + + for (int i = 0; i < NUM_IOS; i++) { + final MemorySegment memSeg = reader.getNextReturnedSegment(); + + for (int pos = 0; pos < memSeg.size(); pos += 4) { + if (memSeg.getInt(pos) != i) { + fail("Read memory segment contains invalid data."); + } + } + reader.readBlock(memSeg); + } + + reader.closeAndDelete(); + + // get back the memory + while (memSegs.size() < NUM_SEGS) { + memSegs.add(reader.getNextReturnedSegment()); + } + } + catch (Exception ex) { + ex.printStackTrace(); + fail("TEst encountered an exception: " + ex.getMessage()); + } + } + + @Test + public void testExceptionPropagationReader() { + try { + // use atomic boolean as a boolean reference + final AtomicBoolean handlerCalled = new AtomicBoolean(); + final AtomicBoolean exceptionForwarded = new AtomicBoolean(); + + ReadRequest req = new ReadRequest() { + + @Override + public void requestDone(IOException ioex) { + if (ioex instanceof TestIOException) { + exceptionForwarded.set(true); + } + + synchronized (handlerCalled) { + handlerCalled.set(true); + handlerCalled.notifyAll(); + } + } + + @Override + public void read() throws IOException { + throw new TestIOException(); + } + }; + + + // test the read queue + RequestQueue<ReadRequest> rq = ioManager.getReadRequestQueue(ioManager.createChannel()); + rq.add(req); + + // wait until the asynchronous request has been handled + synchronized (handlerCalled) { + while (!handlerCalled.get()) { + handlerCalled.wait(); + } + } + + assertTrue(exceptionForwarded.get()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testExceptionPropagationWriter() { + try { + // use atomic boolean as a boolean reference + final AtomicBoolean handlerCalled = new AtomicBoolean(); + final AtomicBoolean exceptionForwarded = new AtomicBoolean(); + + WriteRequest req = new WriteRequest() { + + @Override + public void requestDone(IOException ioex) { + if (ioex instanceof TestIOException) { + exceptionForwarded.set(true); + } + + synchronized (handlerCalled) { + handlerCalled.set(true); + handlerCalled.notifyAll(); + } + } + + @Override + public void write() throws IOException { + throw new TestIOException(); + } + }; + + + // test the read queue + RequestQueue<WriteRequest> rq = ioManager.getWriteRequestQueue(ioManager.createChannel()); + rq.add(req); + + // wait until the asynchronous request has been handled + synchronized (handlerCalled) { + while (!handlerCalled.get()) { + handlerCalled.wait(); + } + } + + assertTrue(exceptionForwarded.get()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testExceptionInCallbackRead() { + try { + final AtomicBoolean handlerCalled = new AtomicBoolean(); + + ReadRequest regularRequest = new ReadRequest() { + + @Override + public void requestDone(IOException ioex) { + synchronized (handlerCalled) { + handlerCalled.set(true); + handlerCalled.notifyAll(); + } + } + + @Override + public void read() {} + }; + + ReadRequest exceptionThrower = new ReadRequest() { + + @Override + public void requestDone(IOException ioex) { + throw new RuntimeException(); + } + + @Override + public void read() {} + }; + + RequestQueue<ReadRequest> rq = ioManager.getReadRequestQueue(ioManager.createChannel()); + + // queue first an exception thrower, then a regular request. + // we check that the regular request gets successfully handled + rq.add(exceptionThrower); + rq.add(regularRequest); + + synchronized (handlerCalled) { + while (!handlerCalled.get()) { + handlerCalled.wait(); + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testExceptionInCallbackWrite() { + try { + final AtomicBoolean handlerCalled = new AtomicBoolean(); + + WriteRequest regularRequest = new WriteRequest() { + + @Override + public void requestDone(IOException ioex) { + synchronized (handlerCalled) { + handlerCalled.set(true); + handlerCalled.notifyAll(); + } + } + + @Override + public void write() {} + }; + + WriteRequest exceptionThrower = new WriteRequest() { + + @Override + public void requestDone(IOException ioex) { + throw new RuntimeException(); + } + + @Override + public void write() {} + }; + + RequestQueue<WriteRequest> rq = ioManager.getWriteRequestQueue(ioManager.createChannel()); + + // queue first an exception thrower, then a regular request. + // we check that the regular request gets successfully handled + rq.add(exceptionThrower); + rq.add(regularRequest); + + synchronized (handlerCalled) { + while (!handlerCalled.get()) { + handlerCalled.wait(); + } + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + + final class TestIOException extends IOException { + private static final long serialVersionUID = -814705441998024472L; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java index 78951d3..f1d5337 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java @@ -25,9 +25,9 @@ import java.util.List; import java.util.Random; import org.junit.Assert; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; @@ -36,22 +36,17 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.DefaultMemoryManagerTest; import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; /** * Integration test case for the I/O manager. */ public class IOManagerITCase { - private static final Logger LOG = LoggerFactory.getLogger(IOManagerITCase.class); - private static final long SEED = 649180756312423613L; - private static final int NUMBER_OF_SEGMENTS = 10; // 10 - - private static final int SEGMENT_SIZE = 1024 * 1024; // 1M + private static final int MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL = 10; + + private static final int MEMORY_SIZE = 10 * 1024 * 1024; // 10 MB private final int NUM_CHANNELS = 29; @@ -63,7 +58,7 @@ public class IOManagerITCase { @Before public void beforeTest() { - memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * SEGMENT_SIZE, 1); + memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1); ioManager = new IOManagerAsync(); } @@ -84,10 +79,7 @@ public class IOManagerITCase { * parallel. It is designed to check the ability of the IO manager to correctly handle multiple threads. */ @Test - public void parallelChannelsTest() throws Exception - { - LOG.info("Starting parallel channels test."); - + public void parallelChannelsTest() throws Exception { final Random rnd = new Random(SEED); final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable(); @@ -106,7 +98,7 @@ public class IOManagerITCase { ids[i] = this.ioManager.createChannel(); writers[i] = this.ioManager.createBlockChannelWriter(ids[i]); - List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) + 2); + List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1); outs[i] = new ChannelWriterOutputView(writers[i], memSegs, this.memoryManager.getPageSize()); } @@ -114,24 +106,13 @@ public class IOManagerITCase { Value val = new Value(); // write a lot of values unevenly distributed over the channels - int nextLogCount = 0; - float nextLogFraction = 0.0f; - LOG.info("Writing to channels..."); for (int i = 0; i < NUMBERS_TO_BE_WRITTEN; i++) { - - if (i == nextLogCount) { - LOG.info("... " + (int) (nextLogFraction * 100) + "% done."); - nextLogFraction += 0.05; - nextLogCount = (int) (nextLogFraction * NUMBERS_TO_BE_WRITTEN); - } - int channel = skewedSample(rnd, NUM_CHANNELS - 1); val.value = String.valueOf(writingCounters[channel]++); val.write(outs[channel]); } - LOG.info("Writing done, flushing contents..."); // close all writers for (int i = 0; i < NUM_CHANNELS; i++) { @@ -141,12 +122,9 @@ public class IOManagerITCase { writers = null; // instantiate the readers for sequential read - LOG.info("Reading channels sequentially..."); - for (int i = 0; i < NUM_CHANNELS; i++) - { - List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) + 2); + for (int i = 0; i < NUM_CHANNELS; i++) { - LOG.info("Reading channel " + (i+1) + "/" + NUM_CHANNELS + '.'); + List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1); final BlockChannelReader reader = this.ioManager.createBlockChannelReader(ids[i]); final ChannelReaderInputView in = new ChannelReaderInputView(reader, memSegs, false); @@ -173,30 +151,19 @@ public class IOManagerITCase { this.memoryManager.release(in.close()); } - LOG.info("Sequential reading done."); // instantiate the readers - LOG.info("Reading channels randomly..."); for (int i = 0; i < NUM_CHANNELS; i++) { - List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) + 2); + List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1); readers[i] = this.ioManager.createBlockChannelReader(ids[i]); ins[i] = new ChannelReaderInputView(readers[i], memSegs, false); } - nextLogCount = 0; - nextLogFraction = 0.0f; - // read a lot of values in a mixed order from the channels for (int i = 0; i < NUMBERS_TO_BE_WRITTEN; i++) { - if (i == nextLogCount) { - LOG.info("... " + (int) (nextLogFraction * 100) + "% done."); - nextLogFraction += 0.05; - nextLogCount = (int) (nextLogFraction * NUMBERS_TO_BE_WRITTEN); - } - while (true) { final int channel = skewedSample(rnd, NUM_CHANNELS - 1); if (ins[channel] != null) { @@ -222,7 +189,6 @@ public class IOManagerITCase { } } - LOG.info("Random reading done."); // close all readers for (int i = 0; i < NUM_CHANNELS; i++) { @@ -256,10 +222,9 @@ public class IOManagerITCase { protected static class Value implements IOReadableWritable { - String value; + private String value; - public Value() { - } + public Value() {} public Value(String val) { this.value = val; @@ -306,5 +271,4 @@ public class IOManagerITCase { return true; } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java index fa6cb80..ab5c206 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java @@ -16,233 +16,96 @@ * limitations under the License. */ - package org.apache.flink.runtime.io.disk.iomanager; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.memory.DefaultMemoryManagerTest.DummyInvokable; -import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.File; -import java.io.IOException; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; -public class IOManagerTest { - - // ------------------------------------------------------------------------ - // Cross Test Fields - // ------------------------------------------------------------------------ - - private IOManager ioManager; - - private DefaultMemoryManager memoryManager; - - // ------------------------------------------------------------------------ - // Setup & Shutdown - // ------------------------------------------------------------------------ - - @Before - public void beforeTest() { - this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024, 1); - this.ioManager = new IOManagerAsync(); - } - - @After - public void afterTest() { - this.ioManager.shutdown(); - Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); - - Assert.assertTrue("Not all memory was returned to the memory manager in the test.", this.memoryManager.verifyEmpty()); - this.memoryManager.shutdown(); - this.memoryManager = null; - } +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; +import org.junit.Test; - // ------------------------------------------------------------------------ - // Test Methods - // ------------------------------------------------------------------------ - - // ------------------------------------------------------------------------ +public class IOManagerTest { - /** - * Tests that the channel enumerator creates channels in the temporary files directory. - */ @Test public void channelEnumerator() { - File tempPath = new File(System.getProperty("java.io.tmpdir")); + File tempPath = new File(System.getProperty("java.io.tmpdir")); - FileIOChannel.Enumerator enumerator = ioManager.createChannelEnumerator(); + String[] tempDirs = new String[] { + new File(tempPath, "a").getAbsolutePath(), + new File(tempPath, "b").getAbsolutePath(), + new File(tempPath, "c").getAbsolutePath(), + new File(tempPath, "d").getAbsolutePath(), + new File(tempPath, "e").getAbsolutePath(), + }; + + int[] counters = new int[tempDirs.length]; + + + FileIOChannel.Enumerator enumerator = new TestIOManager(tempDirs).createChannelEnumerator(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 3 * tempDirs.length; i++) { FileIOChannel.ID id = enumerator.next(); File path = new File(id.getPath()); - Assert.assertTrue("Channel IDs must name an absolute path.", path.isAbsolute()); - Assert.assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory()); - Assert.assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile())); - } - } - - // ------------------------------------------------------------------------ - - @Test - public void channelReadWriteOneSegment() { - final int NUM_IOS = 1111; - - try { - final FileIOChannel.ID channelID = this.ioManager.createChannel(); - final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID); - MemorySegment memSeg = this.memoryManager.allocatePages(new DummyInvokable(), 1).get(0); + assertTrue("Channel IDs must name an absolute path.", path.isAbsolute()); - for (int i = 0; i < NUM_IOS; i++) { - for (int pos = 0; pos < memSeg.size(); pos += 4) { - memSeg.putInt(pos, i); - } - - writer.writeBlock(memSeg); - memSeg = writer.getNextReturnedSegment(); - } + assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory()); - writer.close(); + assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile().getParentFile())); - final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID); - for (int i = 0; i < NUM_IOS; i++) { - reader.readBlock(memSeg); - memSeg = reader.getNextReturnedSegment(); - - for (int pos = 0; pos < memSeg.size(); pos += 4) { - if (memSeg.getInt(pos) != i) { - Assert.fail("Read memory segment contains invalid data."); - } + for (int k = 0; k < tempDirs.length; k++) { + if (path.getParent().equals(tempDirs[k])) { + counters[k]++; } } - - reader.closeAndDelete(); - - this.memoryManager.release(memSeg); - - } catch (Exception ex) { - ex.printStackTrace(); - Assert.fail("TEst encountered an exception: " + ex.getMessage()); } - } - - @Test - public void channelReadWriteMultipleSegments() { - final int NUM_IOS = 1111; - final int NUM_SEGS = 16; - try { - final List<MemorySegment> memSegs = this.memoryManager.allocatePages(new DummyInvokable(), NUM_SEGS); - final FileIOChannel.ID channelID = this.ioManager.createChannel(); - final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID); - - for (int i = 0; i < NUM_IOS; i++) { - final MemorySegment memSeg = memSegs.isEmpty() ? writer.getNextReturnedSegment() : memSegs.remove(0); - - for (int pos = 0; pos < memSeg.size(); pos += 4) { - memSeg.putInt(pos, i); - } - - writer.writeBlock(memSeg); - } - writer.close(); - - // get back the memory - while (memSegs.size() < NUM_SEGS) { - memSegs.add(writer.getNextReturnedSegment()); - } - - final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID); - while(!memSegs.isEmpty()) { - reader.readBlock(memSegs.remove(0)); - } - - for (int i = 0; i < NUM_IOS; i++) { - final MemorySegment memSeg = reader.getNextReturnedSegment(); - - for (int pos = 0; pos < memSeg.size(); pos += 4) { - if (memSeg.getInt(pos) != i) { - Assert.fail("Read memory segment contains invalid data."); - } - } - reader.readBlock(memSeg); - } - - reader.closeAndDelete(); - - // get back the memory - while (memSegs.size() < NUM_SEGS) { - memSegs.add(reader.getNextReturnedSegment()); - } - - this.memoryManager.release(memSegs); - - } catch (Exception ex) { - ex.printStackTrace(); - Assert.fail("TEst encountered an exception: " + ex.getMessage()); + for (int k = 0; k < tempDirs.length; k++) { + assertEquals(3, counters[k]); } } - - // ============================================================================================ - final class FailingSegmentReadRequest implements ReadRequest { - - private final AsynchronousFileIOChannel<MemorySegment, ReadRequest> channel; - - private final MemorySegment segment; - - protected FailingSegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> targetChannel, MemorySegment segment) { - this.channel = targetChannel; - this.segment = segment; + // -------------------------------------------------------------------------------------------- + + private static class TestIOManager extends IOManager { + + protected TestIOManager(String[] paths) { + super(paths); } + @Override + public void shutdown() {} @Override - public void read() throws IOException { - throw new TestIOException(); + public boolean isProperlyShutDown() { + return false; } - @Override - public void requestDone(IOException ioex) { - this.channel.handleProcessedBuffer(this.segment, ioex); + public BlockChannelWriter createBlockChannelWriter(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) { + throw new UnsupportedOperationException(); } - } - //-------------------------------------------------------------------------------------------- - - /** - * Special write request that writes an entire memory segment to the block writer. - */ - final class FailingSegmentWriteRequest implements WriteRequest { - - private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel; - - private final MemorySegment segment; - - protected FailingSegmentWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) { - this.channel = targetChannel; - this.segment = segment; + @Override + public BlockChannelWriterWithCallback createBlockChannelWriter(ID channelID, RequestDoneCallback callback) { + throw new UnsupportedOperationException(); } @Override - public void write() throws IOException { - throw new TestIOException(); + public BlockChannelReader createBlockChannelReader(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) { + throw new UnsupportedOperationException(); } @Override - public void requestDone(IOException ioex) { - this.channel.handleProcessedBuffer(this.segment, ioex); + public BulkBlockChannelReader createBulkBlockChannelReader(ID channelID, List<MemorySegment> targetSegments, int numBlocks) { + throw new UnsupportedOperationException(); } } - - - final class TestIOException extends IOException { - private static final long serialVersionUID = -814705441998024472L; - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java new file mode 100644 index 0000000..951a661 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import java.io.IOException; +import java.util.Random; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.StringValue; +import org.apache.flink.types.Value; + +public final class PairGenerator { + + public static class Pair implements Value { + + private static final long serialVersionUID = 1L; + + private int key; + private StringValue value = new StringValue(); + + + public Pair() {} + + + public int getKey() { + return key; + } + + public StringValue getValue() { + return value; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(key); + value.write(out); + } + + @Override + public void read(DataInputView in) throws IOException { + key = in.readInt(); + value.read(in); + } + + @Override + public int hashCode() { + return 31 * key + value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Pair) { + Pair other = (Pair) obj; + return other.key == this.key && other.value.equals(this.value); + } else { + return false; + } + } + + @Override + public String toString() { + return String.format("(%d, %s)", key, value); + } + } + + public enum KeyMode { + SORTED, RANDOM + }; + + public enum ValueMode { + FIX_LENGTH, RANDOM_LENGTH, CONSTANT + }; + + private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c', + 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' }; + + private final long seed; + + private final int keyMax; + + private final int valueLength; + + private final KeyMode keyMode; + + private final ValueMode valueMode; + + private Random random; + + private int counter; + + private final StringValue valueConstant; + + + public PairGenerator(long seed, int keyMax, int valueLength) { + this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH); + } + + public PairGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) { + this(seed, keyMax, valueLength, keyMode, valueMode, null); + } + + public PairGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, String constant) { + this.seed = seed; + this.keyMax = keyMax; + this.valueLength = valueLength; + this.keyMode = keyMode; + this.valueMode = valueMode; + + this.random = new Random(seed); + this.counter = 0; + + this.valueConstant = new StringValue(); + if (constant != null) { + this.valueConstant.setValue(constant); + } + } + + public void next(Pair target) { + target.key = (keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1); + + if (valueMode == ValueMode.CONSTANT) { + target.value = valueConstant; + } else { + randomString(target.value); + } + } + + public void reset() { + this.random = new Random(seed); + this.counter = 0; + } + + private void randomString(StringValue target) { + + int length = valueMode == ValueMode.FIX_LENGTH ? + valueLength : + valueLength - random.nextInt(valueLength / 3); + + target.setLength(0); + for (int i = 0; i < length; i++) { + target.append(alpha[random.nextInt(alpha.length)]); + } + } +}
