IGNITE-9025 "PDS 1" TC configuration could hang because of 
SegmentedRingByteBufferTest

Signed-off-by: Andrey Gura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43407be1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43407be1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43407be1

Branch: refs/heads/ignite-8783
Commit: 43407be1c23f85d06ac7c28651a301f06395309e
Parents: 85b2002
Author: EdShangGG <[email protected]>
Authored: Tue Jul 17 20:44:37 2018 +0300
Committer: Andrey Gura <[email protected]>
Committed: Tue Jul 17 20:44:37 2018 +0300

----------------------------------------------------------------------
 .../wal/SegmentedRingByteBufferTest.java        | 257 +++++++++++--------
 1 file changed, 144 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/43407be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
index 72563a2..87f3fa7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java
@@ -28,14 +28,17 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import junit.framework.TestCase;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.ONHEAP;
@@ -43,7 +46,7 @@ import static 
org.apache.ignite.internal.processors.cache.persistence.wal.Segmen
 /**
  *
  */
-public class SegmentedRingByteBufferTest extends TestCase {
+public class SegmentedRingByteBufferTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
@@ -296,7 +299,7 @@ public class SegmentedRingByteBufferTest extends TestCase {
     /**
      * @param mode Mode.
      */
-    private void 
doTestNoOverflowMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws 
org.apache.ignite.IgniteCheckedException {
+    private void 
doTestNoOverflowMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws 
org.apache.ignite.IgniteCheckedException, BrokenBarrierException, 
InterruptedException {
         int producerCnt = 16;
 
         final int cap = 256 * 1024;
@@ -305,100 +308,99 @@ public class SegmentedRingByteBufferTest extends 
TestCase {
 
         final AtomicBoolean stop = new AtomicBoolean(false);
 
-        final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
+        final AtomicReference<Throwable> ex = new AtomicReference<>();
 
-        final AtomicLong totalWritten = new AtomicLong();
+        final CyclicBarrier startBarrier = new CyclicBarrier(producerCnt);
 
-        final AtomicInteger cnt = new AtomicInteger();
+        final CyclicBarrier restartBarrier = new CyclicBarrier(producerCnt + 
1);
 
-        final Object mux = new Object();
+        final AtomicLong totalWritten = new AtomicLong();
 
         IgniteInternalFuture<Long> fut;
 
         try {
             fut = GridTestUtils.runMultiThreadedAsync(() -> {
                 try {
-                    barrier.await();
-                }
-                catch (InterruptedException | BrokenBarrierException e) {
-                    e.printStackTrace();
+                    try {
+                        startBarrier.await();
+                    }
+                    catch (InterruptedException | BrokenBarrierException e) {
+                        e.printStackTrace();
 
-                    fail();
-                }
+                        fail();
+                    }
 
-                while (!stop.get()) {
-                    TestObject obj = new TestObject();
+                    while (!stop.get()) {
 
-                    SegmentedRingByteBuffer.WriteSegment seg = 
buf.offer(obj.size());
-                    ByteBuffer bbuf;
+                        TestObject obj = new TestObject();
 
-                    if (seg == null) {
-                        cnt.incrementAndGet();
+                        SegmentedRingByteBuffer.WriteSegment seg = 
buf.offer(obj.size());
 
-                        synchronized (mux) {
-                            try {
-                                if (stop.get())
-                                    break;
+                        ByteBuffer bbuf;
 
-                                mux.wait();
-                            }
-                            catch (InterruptedException e) {
-                                e.printStackTrace();
+                        if (seg == null) {
+                            try {
+                                restartBarrier.await(getTestTimeout(), 
TimeUnit.MILLISECONDS);
+                            } catch (InterruptedException | TimeoutException | 
BrokenBarrierException e) {
+                                break;
                             }
-                        }
-
-                        cnt.decrementAndGet();
 
-                        continue;
-                    }
+                            continue;
+                        }
 
-                    bbuf = seg.buffer();
+                        bbuf = seg.buffer();
 
-                    assertEquals(obj.size(), bbuf.remaining());
+                        assertEquals(obj.size(), bbuf.remaining());
 
-                    bbuf.putLong(obj.id);
-                    bbuf.putInt(obj.len);
-                    bbuf.put(obj.arr);
+                        bbuf.putLong(obj.id);
+                        bbuf.putInt(obj.len);
+                        bbuf.put(obj.arr);
 
-                    assertEquals(0, bbuf.remaining());
+                        assertEquals(0, bbuf.remaining());
 
-                    seg.release();
+                        seg.release();
 
-                    long total = totalWritten.addAndGet(obj.size());
+                        long total = totalWritten.addAndGet(obj.size());
 
-                    assertTrue(total <= cap);
+                        assertTrue(total <= cap);
+                    }
+                }
+                catch (Throwable th) {
+                    ex.compareAndSet(null, th);
                 }
             }, producerCnt, "producer-thread");
 
             long endTime = System.currentTimeMillis() + 60 * 1000L;
 
-            while (System.currentTimeMillis() < endTime) {
-
-                while (cnt.get() < producerCnt)
+            while (System.currentTimeMillis() < endTime && ex.get() == null) {
+                while (restartBarrier.getNumberWaiting() != producerCnt && 
ex.get() == null)
                     U.sleep(10);
 
-                synchronized (mux) {
-                    List<SegmentedRingByteBuffer.ReadSegment> segs = 
buf.poll();
+                if (ex.get() != null)
+                    fail("Exception in producer thread, ex=" + ex.get());
 
-                    if (segs != null) {
-                        for (SegmentedRingByteBuffer.ReadSegment seg : segs)
-                            seg.release();
-                    }
+                List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();
 
-                    totalWritten.set(0);
-
-                    mux.notifyAll();
+                if (segs != null) {
+                    for (SegmentedRingByteBuffer.ReadSegment seg : segs)
+                        seg.release();
                 }
-            }
-        } finally {
-            synchronized (mux) {
-                stop.set(true);
 
-                mux.notifyAll();
+                totalWritten.set(0);
+
+                restartBarrier.await();
             }
         }
+        finally {
+            stop.set(true);
+
+            restartBarrier.reset();
+        }
 
         fut.get();
+
+        if (ex.get() != null)
+            fail("Exception in producer thread, ex=" + ex.get());
     }
 
     /**
@@ -413,6 +415,8 @@ public class SegmentedRingByteBufferTest extends TestCase {
 
         final AtomicBoolean stop = new AtomicBoolean(false);
 
+        final AtomicReference<Throwable> ex = new AtomicReference<>();
+
         final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
 
         IgniteInternalFuture<Long> fut;
@@ -420,41 +424,50 @@ public class SegmentedRingByteBufferTest extends TestCase 
{
         try {
             fut = GridTestUtils.runMultiThreadedAsync(() -> {
                 try {
-                    barrier.await();
-                }
-                catch (InterruptedException | BrokenBarrierException e) {
-                    e.printStackTrace();
+                    try {
+                        barrier.await();
+                    }
+                    catch (InterruptedException | BrokenBarrierException e) {
+                        e.printStackTrace();
 
-                    fail();
-                }
+                        fail();
+                    }
 
-                while (!stop.get()) {
-                    TestObject obj = new TestObject();
+                    while (!stop.get()) {
+                        TestObject obj = new TestObject();
 
-                    SegmentedRingByteBuffer.WriteSegment seg;
-                    ByteBuffer bbuf;
+                        SegmentedRingByteBuffer.WriteSegment seg;
+                        ByteBuffer bbuf;
 
-                    for (;;) {
-                        if (stop.get())
-                            return;
+                        for (;;) {
+                            if (stop.get())
+                                return;
 
-                        seg = buf.offer(obj.size());
+                            seg = buf.offer(obj.size());
 
-                        if (seg != null)
-                            break;
-                    }
+                            if (seg != null)
+                                break;
+                        }
 
-                    bbuf = seg.buffer();
+                        try {
+                            bbuf = seg.buffer();
 
-                    assertEquals(obj.size(), bbuf.remaining());
+                            assertEquals(obj.size(), bbuf.remaining());
 
-                    bbuf.putLong(obj.id);
-                    bbuf.putInt(obj.len);
-                    bbuf.put(obj.arr);
+                            bbuf.putLong(obj.id);
+                            bbuf.putInt(obj.len);
+                            bbuf.put(obj.arr);
 
-                    assertEquals(0, bbuf.remaining());
+                            assertEquals(0, bbuf.remaining());
 
-                    seg.release();
+                        }
+                        finally {
+                            seg.release();
+                        }
+                    }
+                }
+                catch (Throwable th) {
+                    ex.compareAndSet(null, th);
                 }
             }, producerCnt, "producer-thread");
 
@@ -462,7 +475,7 @@ public class SegmentedRingByteBufferTest extends TestCase {
 
             long endTime = System.currentTimeMillis() + 60 * 1000L;
 
-            while (System.currentTimeMillis() < endTime) {
+            while (System.currentTimeMillis() < endTime && ex.get() == null) {
                 try {
                     U.sleep(rnd.nextInt(100) + 1);
                 }
@@ -480,11 +493,15 @@ public class SegmentedRingByteBufferTest extends TestCase 
{
                     }
                 }
             }
-        } finally {
+        }
+        finally {
             stop.set(true);
         }
 
         fut.get();
+
+        if (ex.get() != null)
+            fail("Exception in producer thread, ex=" + ex.get());
     }
 
     /**
@@ -497,6 +514,8 @@ public class SegmentedRingByteBufferTest extends TestCase {
 
         final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, 
Long.MAX_VALUE, mode);
 
+        final AtomicReference<Throwable> ex = new AtomicReference<>();
+
         final AtomicBoolean stop = new AtomicBoolean(false);
 
         final CyclicBarrier barrier = new CyclicBarrier(producerCnt);
@@ -508,43 +527,51 @@ public class SegmentedRingByteBufferTest extends TestCase 
{
         try {
             fut = GridTestUtils.runMultiThreadedAsync(() -> {
                 try {
-                    barrier.await();
-                }
-                catch (InterruptedException | BrokenBarrierException e) {
-                    e.printStackTrace();
-
-                    fail();
-                }
+                    try {
+                        barrier.await();
+                    }
+                    catch (InterruptedException | BrokenBarrierException e) {
+                        e.printStackTrace();
 
-                while (!stop.get()) {
-                    TestObject obj = new TestObject();
+                        fail();
+                    }
 
-                    SegmentedRingByteBuffer.WriteSegment seg;
-                    ByteBuffer bbuf;
+                    while (!stop.get()) {
+                        TestObject obj = new TestObject();
 
-                    for (;;) {
-                        if (stop.get())
-                            return;
+                        SegmentedRingByteBuffer.WriteSegment seg;
+                        ByteBuffer bbuf;
 
-                        seg = buf.offer(obj.size());
+                        for (;;) {
+                            if (stop.get())
+                                return;
 
-                        if (seg != null)
-                            break;
-                    }
+                            seg = buf.offer(obj.size());
 
-                    bbuf = seg.buffer();
+                            if (seg != null)
+                                break;
+                        }
 
-                    assertEquals(obj.size(), bbuf.remaining());
+                        try {
+                            bbuf = seg.buffer();
 
-                    bbuf.putLong(obj.id);
-                    bbuf.putInt(obj.len);
-                    bbuf.put(obj.arr);
+                            assertEquals(obj.size(), bbuf.remaining());
 
-                    assertEquals(0, bbuf.remaining());
+                            bbuf.putLong(obj.id);
+                            bbuf.putInt(obj.len);
+                            bbuf.put(obj.arr);
 
-                    assertTrue("Ooops! The same value is already exist in Set! 
", items.add(obj));
+                            assertEquals(0, bbuf.remaining());
 
-                    seg.release();
+                            assertTrue("Ooops! The same value is already exist 
in Set! ", items.add(obj));
+                        }
+                        finally {
+                            seg.release();
+                        }
+                    }
+                }
+                catch (Throwable th) {
+                    ex.compareAndSet(null, th);
                 }
             }, producerCnt, "producer-thread");
 
@@ -552,7 +579,7 @@ public class SegmentedRingByteBufferTest extends TestCase {
 
             long endTime = System.currentTimeMillis() + 60 * 1000L;
 
-            while (System.currentTimeMillis() < endTime) {
+            while (System.currentTimeMillis() < endTime && ex.get() == null) {
                 try {
                     U.sleep(rnd.nextInt(100) + 1);
                 }
@@ -613,12 +640,16 @@ public class SegmentedRingByteBufferTest extends TestCase 
{
                         seg.release();
                 }
             }
-        } finally {
+        }
+        finally {
             stop.set(true);
         }
 
         fut.get();
 
+        if (ex.get() != null)
+            fail("Exception in producer thread, ex=" + ex.get());
+
         List<SegmentedRingByteBuffer.ReadSegment> segs;
 
         while ((segs = buf.poll()) != null) {

Reply via email to