IGNITE-9025 "PDS 1" TC configuration could hang because of SegmentedRingByteBufferTest
Signed-off-by: Andrey Gura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43407be1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43407be1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43407be1 Branch: refs/heads/ignite-8783 Commit: 43407be1c23f85d06ac7c28651a301f06395309e Parents: 85b2002 Author: EdShangGG <[email protected]> Authored: Tue Jul 17 20:44:37 2018 +0300 Committer: Andrey Gura <[email protected]> Committed: Tue Jul 17 20:44:37 2018 +0300 ---------------------------------------------------------------------- .../wal/SegmentedRingByteBufferTest.java | 257 +++++++++++-------- 1 file changed, 144 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/43407be1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java index 72563a2..87f3fa7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentedRingByteBufferTest.java @@ -28,14 +28,17 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import junit.framework.TestCase; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT; import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.ONHEAP; @@ -43,7 +46,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.Segmen /** * */ -public class SegmentedRingByteBufferTest extends TestCase { +public class SegmentedRingByteBufferTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ @@ -296,7 +299,7 @@ public class SegmentedRingByteBufferTest extends TestCase { /** * @param mode Mode. */ - private void doTestNoOverflowMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException { + private void doTestNoOverflowMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException, BrokenBarrierException, InterruptedException { int producerCnt = 16; final int cap = 256 * 1024; @@ -305,100 +308,99 @@ public class SegmentedRingByteBufferTest extends TestCase { final AtomicBoolean stop = new AtomicBoolean(false); - final CyclicBarrier barrier = new CyclicBarrier(producerCnt); + final AtomicReference<Throwable> ex = new AtomicReference<>(); - final AtomicLong totalWritten = new AtomicLong(); + final CyclicBarrier startBarrier = new CyclicBarrier(producerCnt); - final AtomicInteger cnt = new AtomicInteger(); + final CyclicBarrier restartBarrier = new CyclicBarrier(producerCnt + 1); - final Object mux = new Object(); + final AtomicLong totalWritten = new AtomicLong(); IgniteInternalFuture<Long> fut; try { fut = GridTestUtils.runMultiThreadedAsync(() -> { try { - barrier.await(); - } - catch (InterruptedException | BrokenBarrierException e) { - e.printStackTrace(); + try { + startBarrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); - fail(); - } + fail(); + } - while (!stop.get()) { - TestObject obj = new TestObject(); + while (!stop.get()) { - SegmentedRingByteBuffer.WriteSegment seg = buf.offer(obj.size()); - ByteBuffer bbuf; + TestObject obj = new TestObject(); - if (seg == null) { - cnt.incrementAndGet(); + SegmentedRingByteBuffer.WriteSegment seg = buf.offer(obj.size()); - synchronized (mux) { - try { - if (stop.get()) - break; + ByteBuffer bbuf; - mux.wait(); - } - catch (InterruptedException e) { - e.printStackTrace(); + if (seg == null) { + try { + restartBarrier.await(getTestTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException | TimeoutException | BrokenBarrierException e) { + break; } - } - - cnt.decrementAndGet(); - continue; - } + continue; + } - bbuf = seg.buffer(); + bbuf = seg.buffer(); - assertEquals(obj.size(), bbuf.remaining()); + assertEquals(obj.size(), bbuf.remaining()); - bbuf.putLong(obj.id); - bbuf.putInt(obj.len); - bbuf.put(obj.arr); + bbuf.putLong(obj.id); + bbuf.putInt(obj.len); + bbuf.put(obj.arr); - assertEquals(0, bbuf.remaining()); + assertEquals(0, bbuf.remaining()); - seg.release(); + seg.release(); - long total = totalWritten.addAndGet(obj.size()); + long total = totalWritten.addAndGet(obj.size()); - assertTrue(total <= cap); + assertTrue(total <= cap); + } + } + catch (Throwable th) { + ex.compareAndSet(null, th); } }, producerCnt, "producer-thread"); long endTime = System.currentTimeMillis() + 60 * 1000L; - while (System.currentTimeMillis() < endTime) { - - while (cnt.get() < producerCnt) + while (System.currentTimeMillis() < endTime && ex.get() == null) { + while (restartBarrier.getNumberWaiting() != producerCnt && ex.get() == null) U.sleep(10); - synchronized (mux) { - List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(); + if (ex.get() != null) + fail("Exception in producer thread, ex=" + ex.get()); - if (segs != null) { - for (SegmentedRingByteBuffer.ReadSegment seg : segs) - seg.release(); - } + List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(); - totalWritten.set(0); - - mux.notifyAll(); + if (segs != null) { + for (SegmentedRingByteBuffer.ReadSegment seg : segs) + seg.release(); } - } - } finally { - synchronized (mux) { - stop.set(true); - mux.notifyAll(); + totalWritten.set(0); + + restartBarrier.await(); } } + finally { + stop.set(true); + + restartBarrier.reset(); + } fut.get(); + + if (ex.get() != null) + fail("Exception in producer thread, ex=" + ex.get()); } /** @@ -413,6 +415,8 @@ public class SegmentedRingByteBufferTest extends TestCase { final AtomicBoolean stop = new AtomicBoolean(false); + final AtomicReference<Throwable> ex = new AtomicReference<>(); + final CyclicBarrier barrier = new CyclicBarrier(producerCnt); IgniteInternalFuture<Long> fut; @@ -420,41 +424,50 @@ public class SegmentedRingByteBufferTest extends TestCase { try { fut = GridTestUtils.runMultiThreadedAsync(() -> { try { - barrier.await(); - } - catch (InterruptedException | BrokenBarrierException e) { - e.printStackTrace(); + try { + barrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); - fail(); - } + fail(); + } - while (!stop.get()) { - TestObject obj = new TestObject(); + while (!stop.get()) { + TestObject obj = new TestObject(); - SegmentedRingByteBuffer.WriteSegment seg; - ByteBuffer bbuf; + SegmentedRingByteBuffer.WriteSegment seg; + ByteBuffer bbuf; - for (;;) { - if (stop.get()) - return; + for (;;) { + if (stop.get()) + return; - seg = buf.offer(obj.size()); + seg = buf.offer(obj.size()); - if (seg != null) - break; - } + if (seg != null) + break; + } - bbuf = seg.buffer(); + try { + bbuf = seg.buffer(); - assertEquals(obj.size(), bbuf.remaining()); + assertEquals(obj.size(), bbuf.remaining()); - bbuf.putLong(obj.id); - bbuf.putInt(obj.len); - bbuf.put(obj.arr); + bbuf.putLong(obj.id); + bbuf.putInt(obj.len); + bbuf.put(obj.arr); - assertEquals(0, bbuf.remaining()); + assertEquals(0, bbuf.remaining()); - seg.release(); + } + finally { + seg.release(); + } + } + } + catch (Throwable th) { + ex.compareAndSet(null, th); } }, producerCnt, "producer-thread"); @@ -462,7 +475,7 @@ public class SegmentedRingByteBufferTest extends TestCase { long endTime = System.currentTimeMillis() + 60 * 1000L; - while (System.currentTimeMillis() < endTime) { + while (System.currentTimeMillis() < endTime && ex.get() == null) { try { U.sleep(rnd.nextInt(100) + 1); } @@ -480,11 +493,15 @@ public class SegmentedRingByteBufferTest extends TestCase { } } } - } finally { + } + finally { stop.set(true); } fut.get(); + + if (ex.get() != null) + fail("Exception in producer thread, ex=" + ex.get()); } /** @@ -497,6 +514,8 @@ public class SegmentedRingByteBufferTest extends TestCase { final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode); + final AtomicReference<Throwable> ex = new AtomicReference<>(); + final AtomicBoolean stop = new AtomicBoolean(false); final CyclicBarrier barrier = new CyclicBarrier(producerCnt); @@ -508,43 +527,51 @@ public class SegmentedRingByteBufferTest extends TestCase { try { fut = GridTestUtils.runMultiThreadedAsync(() -> { try { - barrier.await(); - } - catch (InterruptedException | BrokenBarrierException e) { - e.printStackTrace(); - - fail(); - } + try { + barrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + e.printStackTrace(); - while (!stop.get()) { - TestObject obj = new TestObject(); + fail(); + } - SegmentedRingByteBuffer.WriteSegment seg; - ByteBuffer bbuf; + while (!stop.get()) { + TestObject obj = new TestObject(); - for (;;) { - if (stop.get()) - return; + SegmentedRingByteBuffer.WriteSegment seg; + ByteBuffer bbuf; - seg = buf.offer(obj.size()); + for (;;) { + if (stop.get()) + return; - if (seg != null) - break; - } + seg = buf.offer(obj.size()); - bbuf = seg.buffer(); + if (seg != null) + break; + } - assertEquals(obj.size(), bbuf.remaining()); + try { + bbuf = seg.buffer(); - bbuf.putLong(obj.id); - bbuf.putInt(obj.len); - bbuf.put(obj.arr); + assertEquals(obj.size(), bbuf.remaining()); - assertEquals(0, bbuf.remaining()); + bbuf.putLong(obj.id); + bbuf.putInt(obj.len); + bbuf.put(obj.arr); - assertTrue("Ooops! The same value is already exist in Set! ", items.add(obj)); + assertEquals(0, bbuf.remaining()); - seg.release(); + assertTrue("Ooops! The same value is already exist in Set! ", items.add(obj)); + } + finally { + seg.release(); + } + } + } + catch (Throwable th) { + ex.compareAndSet(null, th); } }, producerCnt, "producer-thread"); @@ -552,7 +579,7 @@ public class SegmentedRingByteBufferTest extends TestCase { long endTime = System.currentTimeMillis() + 60 * 1000L; - while (System.currentTimeMillis() < endTime) { + while (System.currentTimeMillis() < endTime && ex.get() == null) { try { U.sleep(rnd.nextInt(100) + 1); } @@ -613,12 +640,16 @@ public class SegmentedRingByteBufferTest extends TestCase { seg.release(); } } - } finally { + } + finally { stop.set(true); } fut.get(); + if (ex.get() != null) + fail("Exception in producer thread, ex=" + ex.get()); + List<SegmentedRingByteBuffer.ReadSegment> segs; while ((segs = buf.poll()) != null) {
