This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2da9ac4b2108ec13ac97a8aff90c43449b2efb4e Author: Piotr Nowojski <[email protected]> AuthorDate: Wed May 20 15:17:24 2020 +0200 [FLINK-17842][network] Fix performance regression in SpanningWrapper#clear For some reason the following commit: 54155744bd [FLINK-17547][task] Use RefCountedFile in SpanningWrapper caused a performance regression in various benchmarks. It's hard to tell why as none of the benchmarks are using spill files (records are too small), so our best guess is that combination of AtomicInteger inside RefCountedFile plus NullPointerException handling messed up with JIT ability to get rid of the memory barrier (from AtomicInteger) on the hot path. --- .../io/network/api/serialization/SpanningWrapper.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java index 45d6ad7..7e3a2da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java @@ -50,7 +50,7 @@ import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningW import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES; import static org.apache.flink.util.CloseableIterator.empty; import static org.apache.flink.util.FileUtils.writeCompletely; -import static org.apache.flink.util.IOUtils.closeAllQuietly; +import static org.apache.flink.util.IOUtils.closeQuietly; final class SpanningWrapper { @@ -249,7 +249,17 @@ final class SpanningWrapper { leftOverLimit = 0; accumulatedRecordBytes = 0; - closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release()); + if (spillingChannel != null) { + closeQuietly(spillingChannel); + } + if (spillFileReader != null) { + closeQuietly(spillFileReader); + } + if (spillFile != null) { + // It's important to avoid AtomicInteger access inside `release()` on the hot path + closeQuietly(() -> spillFile.release()); + } + spillingChannel = null; spillFileReader = null; spillFile = null;
