[hotfix] [network] [tests] Make AwaitableBufferAvailablityListener thread-safe

This is called asynchronously by the spill writer and thus may need
synchronization on incrementing the counter but definately had visibility
issues with the counter. Using an AtomicLong fixes that.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49623980
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49623980
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49623980

Branch: refs/heads/master
Commit: 496239806e68535c9c8291e320f8886d3a1b8709
Parents: 30eb8cd
Author: Nico Kruber <n...@data-artisans.com>
Authored: Fri Feb 23 10:35:41 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 9 16:49:40 2018 +0100

----------------------------------------------------------------------
 .../partition/AwaitableBufferAvailablityListener.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49623980/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
index 2b6b834..6cf9d64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java
@@ -18,29 +18,31 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Test implementation of {@link BufferAvailabilityListener}.
  */
 class AwaitableBufferAvailablityListener implements BufferAvailabilityListener 
{
 
-       private long numNotifications;
+       private final AtomicLong numNotifications = new AtomicLong();
 
        @Override
        public void notifyDataAvailable() {
-               ++numNotifications;
+               numNotifications.getAndIncrement();
        }
 
        public long getNumNotifications() {
-               return numNotifications;
+               return numNotifications.get();
        }
 
        public void resetNotificationCounters() {
-               numNotifications = 0;
+               numNotifications.set(0L);
        }
 
        void awaitNotifications(long awaitedNumNotifications, long 
timeoutMillis) throws InterruptedException {
                long deadline = System.currentTimeMillis() + timeoutMillis;
-               while (numNotifications < awaitedNumNotifications && 
System.currentTimeMillis() < deadline) {
+               while (numNotifications.get() < awaitedNumNotifications && 
System.currentTimeMillis() < deadline) {
                        Thread.sleep(1);
                }
        }

Reply via email to