This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dfd08ab29ce2aa1c7f1f6699557f4b9515473ebe Author: Stephan Ewen <[email protected]> AuthorDate: Mon Apr 15 22:22:36 2019 +0200 [hotfix] [tests] Network and Partition Tests pass in Testing IOManager rather than dysfunctional mock --- .../runtime/io/disk/iomanager/NoOpIOManager.java | 73 ++++++++++++++++++++++ .../runtime/io/network/NetworkEnvironmentTest.java | 5 +- .../io/network/partition/ResultPartitionTest.java | 14 +---- 3 files changed, 78 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java new file mode 100644 index 0000000..f98c46f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/NoOpIOManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.disk.iomanager; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.util.EnvironmentInformation; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * An {@link IOManager} that cannot do I/O but serves as a mock for tests. + */ +public class NoOpIOManager extends IOManager { + + public NoOpIOManager() { + super(new String[] {EnvironmentInformation.getTemporaryFileDirectory()}); + } + + @Override + public BlockChannelWriter<MemorySegment> createBlockChannelWriter(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BlockChannelReader<MemorySegment> createBlockChannelReader(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BufferFileWriter createBufferFileWriter(ID channelID) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BufferFileReader createBufferFileReader(ID channelID, RequestDoneCallback<Buffer> callback) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BufferFileSegmentReader createBufferFileSegmentReader(ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BulkBlockChannelReader createBulkBlockChannelReader(ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index bedd090..3a2014a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager; import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartition; @@ -276,6 +276,7 @@ public class NetworkEnvironmentTest { */ private static ResultPartition createResultPartition( final ResultPartitionType partitionType, final int channels) { + return new ResultPartition( "TestTask-" + partitionType + ":" + channels, new NoOpTaskActions(), @@ -286,7 +287,7 @@ public class NetworkEnvironmentTest { channels, mock(ResultPartitionManager.class), new NoOpResultPartitionConsumableNotifier(), - mock(IOManager.class), + new NoOpIOManager(), false); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 8456e9c..9e3c117 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -19,8 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; @@ -29,7 +28,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.apache.flink.runtime.taskmanager.TaskActions; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; @@ -50,14 +48,6 @@ import static org.mockito.Mockito.verify; */ public class ResultPartitionTest { - /** Asynchronous I/O manager. */ - private static final IOManager ioManager = new IOManagerAsync(); - - @AfterClass - public static void shutdown() { - ioManager.shutdown(); - } - /** * Tests the schedule or update consumers message sending behaviour depending on the relevant flags. */ @@ -277,7 +267,7 @@ public class ResultPartitionTest { 1, mock(ResultPartitionManager.class), notifier, - ioManager, + new NoOpIOManager(), sendScheduleOrUpdateConsumersMessage); } }
