Repository: oozie Updated Branches: refs/heads/master 059462650 -> 526dbd198
OOZIE-3237 Flaky test TestZKLocksService#testWriteReadLockThreads (pbacsko via andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/526dbd19 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/526dbd19 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/526dbd19 Branch: refs/heads/master Commit: 526dbd19814877af15a08b7b3a3b164a6737f9df Parents: 0594626 Author: Andras Piros <andras.pi...@cloudera.com> Authored: Tue May 22 19:30:25 2018 +0200 Committer: Andras Piros <andras.pi...@cloudera.com> Committed: Tue May 22 19:30:25 2018 +0200 ---------------------------------------------------------------------- .../org/apache/oozie/lock/TestMemoryLocks.java | 104 ++++-------- .../oozie/service/TestZKLocksService.java | 167 ++++++++----------- .../test/java/org/apache/oozie/util/Locker.java | 85 ++++++++++ .../apache/oozie/util/LockerCoordinator.java | 71 ++++++++ release-log.txt | 1 + 5 files changed, 257 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java index 8c7b58e..1b7bcbd 100644 --- a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java +++ b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java @@ -19,18 +19,17 @@ package org.apache.oozie.lock; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import org.apache.oozie.service.MemoryLocksService; import org.apache.oozie.service.MemoryLocksService.Type; +import org.apache.oozie.test.XTestCase; import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; -import org.apache.oozie.test.XTestCase; +import org.apache.oozie.util.LockerCoordinator; +import org.apache.oozie.util.Locker; import org.apache.oozie.util.XLog; public class TestMemoryLocks extends XTestCase { - private static final int LATCH_TIMEOUT = 10; private XLog log = XLog.getLog(getClass()); public static final int DEFAULT_LOCK_TIMEOUT = 5 * 1000; @@ -46,72 +45,6 @@ public class TestMemoryLocks extends XTestCase { super.tearDown(); } - public abstract class LatchHandler { - protected CountDownLatch startLatch = new CountDownLatch(1); - protected CountDownLatch acquireLockLatch = new CountDownLatch(1); - protected CountDownLatch proceedingLatch = new CountDownLatch(1); - protected CountDownLatch terminationLatch = new CountDownLatch(1); - - public void awaitStart() throws InterruptedException { - startLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS); - } - - public void awaitTermination() throws InterruptedException { - terminationLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS); - } - - public void awaitLockAcquire() throws InterruptedException { - acquireLockLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS); - } - - public void proceed() { - proceedingLatch.countDown(); - } - } - - public abstract class Locker extends LatchHandler implements Runnable { - protected String name; - private String nameIndex; - private StringBuffer sb; - protected long timeout; - - public Locker(String name, int nameIndex, long timeout, StringBuffer buffer) { - this.name = name; - this.nameIndex = name + ":" + nameIndex; - this.sb = buffer; - this.timeout = timeout; - } - - public void run() { - try { - log.info("Getting lock [{0}]", nameIndex); - startLatch.countDown(); - MemoryLocks.MemoryLockToken token = getLock(); - if (token != null) { - log.info("Got lock [{0}]", nameIndex); - sb.append(nameIndex + "-L "); - - acquireLockLatch.countDown(); - proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS); - - sb.append(nameIndex + "-U "); - token.release(); - log.info("Release lock [{0}]", nameIndex); - } - else { - proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS); - sb.append(nameIndex + "-N "); - log.info("Did not get lock [{0}]", nameIndex); - } - terminationLatch.countDown(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - protected abstract MemoryLocks.MemoryLockToken getLock() throws InterruptedException; - } public class ReadLocker extends Locker { @@ -119,6 +52,7 @@ public class TestMemoryLocks extends XTestCase { super(name, nameIndex, timeout, buffer); } + @Override protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException { return locks.getLock(name, Type.READ, timeout); } @@ -130,6 +64,7 @@ public class TestMemoryLocks extends XTestCase { super(name, nameIndex, timeout, buffer); } + @Override protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException { return locks.getLock(name, Type.WRITE, timeout); } @@ -275,11 +210,12 @@ public class TestMemoryLocks extends XTestCase { assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim()); } - public class SameThreadWriteLocker extends LatchHandler implements Runnable { + public class SameThreadWriteLocker implements Runnable { protected String name; private String nameIndex; private StringBuffer sb; protected long timeout; + private final LockerCoordinator coordinator = new LockerCoordinator(); public SameThreadWriteLocker(String name, int nameIndex, long timeout, StringBuffer buffer) { this.name = name; @@ -290,13 +226,13 @@ public class TestMemoryLocks extends XTestCase { public void run() { try { - startLatch.countDown(); + coordinator.startDone(); log.info("Getting lock [{0}]", nameIndex); MemoryLocks.MemoryLockToken token = getLock(); MemoryLocks.MemoryLockToken token2 = getLock(); if (token != null) { - acquireLockLatch.countDown(); + coordinator.lockAcquireDone(); log.info("Got lock [{0}]", nameIndex); sb.append(nameIndex + "-L1 "); @@ -305,7 +241,7 @@ public class TestMemoryLocks extends XTestCase { } sb.append(nameIndex + "-U1 "); - proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS); + coordinator.awaitContinueSignal(); token.release(); sb.append(nameIndex + "-U2 "); @@ -313,17 +249,33 @@ public class TestMemoryLocks extends XTestCase { log.info("Release lock [{0}]", nameIndex); } else { - proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS); + coordinator.awaitContinueSignal(); sb.append(nameIndex + "-N "); log.info("Did not get lock [{0}]", nameIndex); } - terminationLatch.countDown(); + coordinator.terminated(); } catch (Exception ex) { throw new RuntimeException(ex); } } + public void awaitLockAcquire() throws InterruptedException { + coordinator.awaitLockAcquire(); + } + + public void awaitStart() throws InterruptedException { + coordinator.awaitStart(); + } + + public void proceed() { + coordinator.signalLockerContinue(); + } + + public void awaitTermination() throws InterruptedException { + coordinator.awaitTermination(); + } + protected MemoryLocks.MemoryLockToken getLock() throws InterruptedException { return locks.getLock(name, Type.WRITE, timeout); } http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java index b7dee7e..ee83867 100644 --- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java +++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java @@ -26,6 +26,7 @@ import org.apache.oozie.lock.LockToken; import org.apache.oozie.lock.TestMemoryLocks; import org.apache.oozie.service.ZKLocksService.ZKLockToken; import org.apache.oozie.test.ZKXTestCase; +import org.apache.oozie.util.Locker; import org.apache.oozie.util.XLog; import org.apache.oozie.util.ZKUtils; import org.apache.zookeeper.KeeperException.ConnectionLossException; @@ -64,59 +65,12 @@ public class TestZKLocksService extends ZKXTestCase { } } - public abstract class Locker implements Runnable { - protected String name; - private String nameIndex; - private StringBuffer sb; - protected long timeout; - protected ZKLocksService zkls; - - public Locker(String name, int nameIndex, long timeout, StringBuffer buffer, ZKLocksService zkls) { - this.name = name; - this.nameIndex = name + ":" + nameIndex; - this.sb = buffer; - this.timeout = timeout; - this.zkls = zkls; - } - - @Override - public void run() { - try { - log.info("Getting lock [{0}]", nameIndex); - LockToken token = getLock(); - if (token != null) { - log.info("Got lock [{0}]", nameIndex); - sb.append(nameIndex).append("-L "); - synchronized (this) { - wait(); - } - sb.append(nameIndex).append("-U "); - token.release(); - log.info("Release lock [{0}]", nameIndex); - } - else { - sb.append(nameIndex).append("-N "); - log.info("Did not get lock [{0}]", nameIndex); - } - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public void finish() { - synchronized (this) { - notify(); - } - } - - protected abstract ZKLocksService.ZKLockToken getLock() throws InterruptedException; - } - public class ReadLocker extends Locker { + private final ZKLocksService zkls; public ReadLocker(String name, int nameIndex, long timeout, StringBuffer buffer, ZKLocksService zkls) { - super(name, nameIndex, timeout, buffer, zkls); + super(name, nameIndex, timeout, buffer); + this.zkls = zkls; } @Override @@ -126,9 +80,11 @@ public class TestZKLocksService extends ZKXTestCase { } public class WriteLocker extends Locker { + private final ZKLocksService zkls; public WriteLocker(String name, int nameIndex, long timeout, StringBuffer buffer, ZKLocksService zkls) { - super(name, nameIndex, timeout, buffer, zkls); + super(name, nameIndex, timeout, buffer); + this.zkls = zkls; } @Override @@ -169,13 +125,16 @@ public class TestZKLocksService extends ZKXTestCase { Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2); new Thread(l1).start(); - sleep(1000); + l1.awaitLockAcquire(); new Thread(l2).start(); - sleep(1000); - l1.finish(); - sleep(1000); - l2.finish(); - sleep(1000); + l2.awaitStart(); + + l1.proceed(); + l2.proceed(); + + l1.awaitTermination(); + l2.awaitTermination(); + assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim()); } @@ -211,13 +170,17 @@ public class TestZKLocksService extends ZKXTestCase { Locker l2 = new WriteLocker("a", 2, 0, sb, zkls2); new Thread(l1).start(); - sleep(1000); + l1.awaitLockAcquire(); + new Thread(l2).start(); - sleep(1000); - l1.finish(); - sleep(1000); - l2.finish(); - sleep(1000); + l2.awaitStart(); + + l2.proceed(); + l2.awaitTermination(); + + l1.proceed(); + l1.awaitTermination(); + assertEquals("a:1-L a:2-N a:1-U", sb.toString().trim()); } @@ -253,13 +216,16 @@ public class TestZKLocksService extends ZKXTestCase { Locker l2 = new WriteLocker("a", 2, (long) (WAITFOR_RATIO * 2000), sb, zkls2); new Thread(l1).start(); - sleep(1000); + l1.awaitLockAcquire(); new Thread(l2).start(); - sleep(1000); - l1.finish(); - sleep(1000); - l2.finish(); - sleep(1000); + l2.awaitStart(); + + l1.proceed(); + l1.awaitTermination(); + + l2.proceed(); + l2.awaitTermination(); + assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim()); } @@ -295,13 +261,16 @@ public class TestZKLocksService extends ZKXTestCase { Locker l2 = new WriteLocker("a", 2, 50, sb, zkls2); new Thread(l1).start(); - sleep(1000); + l1.awaitLockAcquire(); new Thread(l2).start(); - sleep(1000); - l1.finish(); - sleep(1000); - l2.finish(); - sleep(1000); + l2.awaitStart(); + + l2.proceed(); + l2.awaitTermination(); + + l1.proceed(); + l1.awaitTermination(); + assertEquals("a:1-L a:2-N a:1-U", sb.toString().trim()); } @@ -337,13 +306,16 @@ public class TestZKLocksService extends ZKXTestCase { Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2); new Thread(l1).start(); - sleep(1000); + l1.awaitLockAcquire(); new Thread(l2).start(); - sleep(1000); - l1.finish(); - sleep(1000); - l2.finish(); - sleep(1000); + l2.awaitLockAcquire(); + + l1.proceed(); + l1.awaitTermination(); + + l2.proceed(); + l2.awaitTermination(); + assertEquals("a:1-L a:2-L a:1-U a:2-U", sb.toString().trim()); } @@ -379,13 +351,16 @@ public class TestZKLocksService extends ZKXTestCase { Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2); new Thread(l1).start(); - sleep(1000); + l1.awaitLockAcquire(); new Thread(l2).start(); - sleep(1000); - l1.finish(); - sleep(1000); - l2.finish(); - sleep(1000); + l2.awaitStart(); + + l1.proceed(); + l1.awaitTermination(); + + l2.proceed(); + l2.awaitTermination(); + assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim()); } @@ -421,13 +396,15 @@ public class TestZKLocksService extends ZKXTestCase { Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2); new Thread(l1).start(); - sleep(1000); + l1.awaitLockAcquire(); new Thread(l2).start(); - sleep(1000); - l1.finish(); - sleep(1000); - l2.finish(); - sleep(1000); + l2.awaitStart(); + + l1.proceed(); + l1.awaitTermination(); + + l2.proceed(); + l2.awaitTermination(); assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim()); } @@ -583,9 +560,9 @@ public class TestZKLocksService extends ZKXTestCase { try { // Stop the exception on release() after some time in other thread Thread.sleep(TimeUnit.SECONDS.toMillis(13)); - Mockito.doAnswer(new Answer() { + Mockito.doAnswer(new Answer<Void>() { @Override - public Object answer(InvocationOnMock invocation) throws Throwable { + public Void answer(InvocationOnMock invocation) throws Throwable { lockReleased[0] = true; return null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/util/Locker.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/Locker.java b/core/src/test/java/org/apache/oozie/util/Locker.java new file mode 100644 index 0000000..23431e6 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/Locker.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + +import org.apache.commons.logging.LogFactory; +import org.apache.oozie.lock.LockToken; + +public abstract class Locker implements Runnable { + private static final XLog log = new XLog(LogFactory.getLog(Locker.class)); + private final String nameIndex; + private final StringBuffer sb; + private final LockerCoordinator coordinator = new LockerCoordinator(); + + protected String name; + protected long timeout; + + public Locker(String name, int nameIndex, long timeout, StringBuffer buffer) { + this.name = name; + this.nameIndex = name + ":" + nameIndex; + this.sb = buffer; + this.timeout = timeout; + } + + @Override + public void run() { + try { + log.info("Getting lock [{0}]", nameIndex); + coordinator.startDone(); + LockToken token = getLock(); + if (token != null) { + log.info("Got lock [{0}]", nameIndex); + sb.append(nameIndex + "-L "); + + coordinator.lockAcquireDone(); + coordinator.awaitContinueSignal(); + + sb.append(nameIndex + "-U "); + token.release(); + log.info("Release lock [{0}]", nameIndex); + } + else { + coordinator.awaitContinueSignal(); + sb.append(nameIndex + "-N "); + log.info("Did not get lock [{0}]", nameIndex); + } + coordinator.terminated(); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public void awaitLockAcquire() throws InterruptedException { + coordinator.awaitLockAcquire(); + } + + public void awaitStart() throws InterruptedException { + coordinator.awaitStart(); + } + + public void proceed() { + coordinator.signalLockerContinue(); + } + + public void awaitTermination() throws InterruptedException { + coordinator.awaitTermination(); + } + + protected abstract LockToken getLock() throws InterruptedException; +} http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java b/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java new file mode 100644 index 0000000..67a7bfe --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +// Helper class for some multithreaded tests +public class LockerCoordinator { + public static final int LATCH_TIMEOUT_SECONDS = 10; + + private final CountDownLatch startLatch = new CountDownLatch(1); + private final CountDownLatch acquireLockLatch = new CountDownLatch(1); + private final CountDownLatch proceedingLatch = new CountDownLatch(1); + private final CountDownLatch terminationLatch = new CountDownLatch(1); + + // Test thread waits until Locker thread starts to run + public void awaitStart() throws InterruptedException { + startLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + // Test thread waits until Locker thread terminates + public void awaitTermination() throws InterruptedException { + terminationLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + // Test thread waits until lock acquisition succeeds + public void awaitLockAcquire() throws InterruptedException { + acquireLockLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + // Locker thread blocks until test thread unblocks it + public void awaitContinueSignal() throws InterruptedException { + proceedingLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } + + // Test thread unblocks Locker thread + public void signalLockerContinue() { + proceedingLatch.countDown(); + } + + // Locker thread has started + public void startDone() { + startLatch.countDown(); + } + + // Locker thread acquired the lock + public void lockAcquireDone() { + acquireLockLatch.countDown(); + } + + // Locker thread finished + public void terminated() { + terminationLatch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4411d60..8cb8c3d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3237 Flaky test TestZKLocksService#testWriteReadLockThreads (pbacsko via andras.piros) OOZIE-3251 Disable JMX for ActiveMQ in the tests (pbacsko) OOZIE-2826 Upgrade joda-time to 2.9.9 (dbist13 via andras.piros) OOZIE-3094 [Docs] Fix for grammar mistake in DG_ActionAuthentication.twiki (dbist13 via andras.piros)