[hotfix] [network] [tests] Make AwaitableBufferAvailablityListener thread-safe
This is called asynchronously by the spill writer and thus may need synchronization on incrementing the counter but definately had visibility issues with the counter. Using an AtomicLong fixes that. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49623980 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49623980 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49623980 Branch: refs/heads/master Commit: 496239806e68535c9c8291e320f8886d3a1b8709 Parents: 30eb8cd Author: Nico Kruber <n...@data-artisans.com> Authored: Fri Feb 23 10:35:41 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 9 16:49:40 2018 +0100 ---------------------------------------------------------------------- .../partition/AwaitableBufferAvailablityListener.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/49623980/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java index 2b6b834..6cf9d64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java @@ -18,29 +18,31 @@ package org.apache.flink.runtime.io.network.partition; +import java.util.concurrent.atomic.AtomicLong; + /** * Test implementation of {@link BufferAvailabilityListener}. */ class AwaitableBufferAvailablityListener implements BufferAvailabilityListener { - private long numNotifications; + private final AtomicLong numNotifications = new AtomicLong(); @Override public void notifyDataAvailable() { - ++numNotifications; + numNotifications.getAndIncrement(); } public long getNumNotifications() { - return numNotifications; + return numNotifications.get(); } public void resetNotificationCounters() { - numNotifications = 0; + numNotifications.set(0L); } void awaitNotifications(long awaitedNumNotifications, long timeoutMillis) throws InterruptedException { long deadline = System.currentTimeMillis() + timeoutMillis; - while (numNotifications < awaitedNumNotifications && System.currentTimeMillis() < deadline) { + while (numNotifications.get() < awaitedNumNotifications && System.currentTimeMillis() < deadline) { Thread.sleep(1); } }