Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1270#discussion_r43203880
--- Diff:
flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
---
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.benchmark.runtime.io.disk.iomanager;
+
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.runtime.io.disk.iomanager.*;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.IntValue;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class IOManagerPerformanceBenchmark {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
+
+ @Param({"4096", "16384", "524288"})
+ private int segmentSizesAligned;
+
+ @Param({"3862", "16895", "500481"})
+ private int segmentSizesUnaligned;
+
+ @Param({"1", "2", "4", "6"})
+ private int numSegment;
+
+ private static int numBlocks;
+
+ private static final long MEMORY_SIZE = 32 * 1024 * 1024;
+
+ private static final int NUM_INTS_WRITTEN = 100000000;
+
+ private static final AbstractInvokable memoryOwner = new
DummyInvokable();
+
+ private MemoryManager memManager;
+
+ private IOManager ioManager;
+
+ private static FileIOChannel.ID fileIOChannel;
+
+ private static File ioManagerTempFile1;
+
+ private static File ioManagerTempFile2;
+
+ private static File speedTestNIOTempFile1;
+
+ private static File speedTestNIOTempFile2;
+
+ private static File speedTestNIOTempFile3;
+
+ private static File speedTestNIOTempFile4;
+
+
+ @Setup
+ public void startup() throws Exception {
+ memManager = new MemoryManager(MEMORY_SIZE, 1);
+ ioManager = new IOManagerAsync();
+ this.testChannelWriteWithSegments(numSegment);
+ ioManagerTempFile1 =
this.createReadTempFile(segmentSizesAligned);
+ ioManagerTempFile2 =
this.createReadTempFile(segmentSizesUnaligned);
+ speedTestNIOTempFile1 =
creatSpeedTestNIOTempFile(segmentSizesAligned, true);
+ speedTestNIOTempFile2 =
creatSpeedTestNIOTempFile(segmentSizesAligned, false);
+ speedTestNIOTempFile3 =
creatSpeedTestNIOTempFile(segmentSizesUnaligned, true);
+ speedTestNIOTempFile4 =
creatSpeedTestNIOTempFile(segmentSizesUnaligned, false);
+
+ }
+
+ @TearDown
+ public void afterTest() throws Exception {
+ ioManager.shutdown();
+ Assert.assertTrue("IO Manager has not properly shut down.",
ioManager.isProperlyShutDown());
+
+ Assert.assertTrue("Not all memory was returned to the memory
manager in the test.", memManager.verifyEmpty());
+ memManager.shutdown();
+ memManager = null;
+ }
+
+// ------------------------------------------------------------------------
+
+ private File createReadTempFile(int bufferSize) throws IOException {
+ final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+ final IntValue rec = new IntValue(0);
+
+ File tempFile = null;
+ DataOutputStream daos = null;
+
+ try {
+ tempFile = new File(tmpChannel.getPath());
+
+ FileOutputStream fos = new FileOutputStream(tempFile);
+ daos = new DataOutputStream(new
BufferedOutputStream(fos, bufferSize));
+
+ int valsLeft = NUM_INTS_WRITTEN;
+ while (valsLeft-- > 0) {
+ rec.setValue(valsLeft);
+ rec.write(new
OutputViewDataOutputStreamWrapper(daos));
+ }
+ daos.close();
+ daos = null;
+ }
+ finally {
+ // close if possible
+ if (daos != null) {
+ daos.close();
+ }
+ }
+ return tempFile;
+ }
+
+ @SuppressWarnings("resource")
+ private File creatSpeedTestNIOTempFile(int bufferSize, boolean direct)
throws IOException
+ {
+ final FileIOChannel.ID tmpChannel = ioManager.createChannel();
+
+ File tempFile = null;
+ FileChannel fs = null;
+
+ try {
+ tempFile = new File(tmpChannel.getPath());
+
+ RandomAccessFile raf = new RandomAccessFile(tempFile,
"rw");
+ fs = raf.getChannel();
+
+ ByteBuffer buf = direct ?
ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
+
+ int valsLeft = NUM_INTS_WRITTEN;
+ while (valsLeft-- > 0) {
+ if (buf.remaining() < 4) {
+ buf.flip();
+ fs.write(buf);
+ buf.clear();
+ }
+ buf.putInt(valsLeft);
+ }
+
+ if (buf.position() > 0) {
+ buf.flip();
+ fs.write(buf);
+ }
+
+ fs.close();
+ raf.close();
+ fs = null;
+ }
+ finally {
+ // close if possible
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+ }
+ return tempFile;
+ }
+
+ @Benchmark
+ public void speedTestOutputManager() throws Exception
+ {
+ LOG.info("Starting speed test with IO Manager...");
+
+ testChannelWriteWithSegments(numSegment);
+ }
+
+ @Benchmark
+ public void speedTestInputManager() throws Exception
+ {
+ LOG.info("Starting speed test with IO Manager...");
+
+ testChannelReadWithSegments(numSegment);
+ }
+
+ private void testChannelWriteWithSegments(int numSegments) throws
Exception
+ {
+ final List<MemorySegment> memory =
this.memManager.allocatePages(memoryOwner, numSegments);
+ final FileIOChannel.ID channel = this.ioManager.createChannel();
+
+ BlockChannelWriter<MemorySegment> writer = null;
+
+ try {
+ writer =
this.ioManager.createBlockChannelWriter(channel);
+ final ChannelWriterOutputView out = new
ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
+
+ int valsLeft = NUM_INTS_WRITTEN;
+ while (valsLeft-- > 0) {
+ out.writeInt(valsLeft);
+ }
+
+ fileIOChannel = channel;
+ out.close();
+ numBlocks = out.getBlockCount();
+
+ writer.close();
+ writer = null;
+
+ memManager.release(memory);
+ }
+ finally {
+ if (writer != null) {
+ writer.closeAndDelete();
+ }
+ }
+ }
+
+ private void testChannelReadWithSegments(int numSegments) throws
Exception
+ {
+ final List<MemorySegment> memory =
this.memManager.allocatePages(memoryOwner, numSegments);
+ //final FileIOChannel.ID channel =
this.ioManager.createChannel();
--- End diff --
remove?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---