Repository: oozie
Updated Branches:
  refs/heads/master 059462650 -> 526dbd198


OOZIE-3237 Flaky test TestZKLocksService#testWriteReadLockThreads (pbacsko via 
andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/526dbd19
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/526dbd19
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/526dbd19

Branch: refs/heads/master
Commit: 526dbd19814877af15a08b7b3a3b164a6737f9df
Parents: 0594626
Author: Andras Piros <andras.pi...@cloudera.com>
Authored: Tue May 22 19:30:25 2018 +0200
Committer: Andras Piros <andras.pi...@cloudera.com>
Committed: Tue May 22 19:30:25 2018 +0200

----------------------------------------------------------------------
 .../org/apache/oozie/lock/TestMemoryLocks.java  | 104 ++++--------
 .../oozie/service/TestZKLocksService.java       | 167 ++++++++-----------
 .../test/java/org/apache/oozie/util/Locker.java |  85 ++++++++++
 .../apache/oozie/util/LockerCoordinator.java    |  71 ++++++++
 release-log.txt                                 |   1 +
 5 files changed, 257 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java 
b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
index 8c7b58e..1b7bcbd 100644
--- a/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
+++ b/core/src/test/java/org/apache/oozie/lock/TestMemoryLocks.java
@@ -19,18 +19,17 @@
 package org.apache.oozie.lock;
 
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.oozie.service.MemoryLocksService;
 import org.apache.oozie.service.MemoryLocksService.Type;
+import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.LockerCoordinator;
+import org.apache.oozie.util.Locker;
 import org.apache.oozie.util.XLog;
 
 public class TestMemoryLocks extends XTestCase {
-    private static final int LATCH_TIMEOUT = 10;
     private XLog log = XLog.getLog(getClass());
     public static final int DEFAULT_LOCK_TIMEOUT = 5 * 1000;
 
@@ -46,72 +45,6 @@ public class TestMemoryLocks extends XTestCase {
         super.tearDown();
     }
 
-    public abstract class LatchHandler {
-        protected CountDownLatch startLatch = new CountDownLatch(1);
-        protected CountDownLatch acquireLockLatch = new CountDownLatch(1);
-        protected CountDownLatch proceedingLatch = new CountDownLatch(1);
-        protected CountDownLatch terminationLatch = new CountDownLatch(1);
-
-        public void awaitStart() throws InterruptedException {
-            startLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
-        }
-
-        public void awaitTermination() throws InterruptedException {
-            terminationLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
-        }
-
-        public void awaitLockAcquire() throws InterruptedException {
-            acquireLockLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
-        }
-
-        public void proceed() {
-            proceedingLatch.countDown();
-        }
-    }
-
-    public abstract class Locker extends LatchHandler implements Runnable {
-        protected String name;
-        private String nameIndex;
-        private StringBuffer sb;
-        protected long timeout;
-
-        public Locker(String name, int nameIndex, long timeout, StringBuffer 
buffer) {
-            this.name = name;
-            this.nameIndex = name + ":" + nameIndex;
-            this.sb = buffer;
-            this.timeout = timeout;
-        }
-
-        public void run() {
-            try {
-                log.info("Getting lock [{0}]", nameIndex);
-                startLatch.countDown();
-                MemoryLocks.MemoryLockToken token = getLock();
-                if (token != null) {
-                    log.info("Got lock [{0}]", nameIndex);
-                    sb.append(nameIndex + "-L ");
-
-                    acquireLockLatch.countDown();
-                    proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
-
-                    sb.append(nameIndex + "-U ");
-                    token.release();
-                    log.info("Release lock [{0}]", nameIndex);
-                }
-                else {
-                    proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
-                    sb.append(nameIndex + "-N ");
-                    log.info("Did not get lock [{0}]", nameIndex);
-                }
-                terminationLatch.countDown();
-            }
-            catch (Exception ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-
-        protected abstract MemoryLocks.MemoryLockToken getLock() throws 
InterruptedException;
-    }
 
     public class ReadLocker extends Locker {
 
@@ -119,6 +52,7 @@ public class TestMemoryLocks extends XTestCase {
             super(name, nameIndex, timeout, buffer);
         }
 
+        @Override
         protected MemoryLocks.MemoryLockToken getLock() throws 
InterruptedException {
             return locks.getLock(name, Type.READ, timeout);
         }
@@ -130,6 +64,7 @@ public class TestMemoryLocks extends XTestCase {
             super(name, nameIndex, timeout, buffer);
         }
 
+        @Override
         protected MemoryLocks.MemoryLockToken getLock() throws 
InterruptedException {
             return locks.getLock(name, Type.WRITE, timeout);
         }
@@ -275,11 +210,12 @@ public class TestMemoryLocks extends XTestCase {
         assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
     }
 
-    public class SameThreadWriteLocker extends LatchHandler implements 
Runnable {
+    public class SameThreadWriteLocker implements Runnable {
         protected String name;
         private String nameIndex;
         private StringBuffer sb;
         protected long timeout;
+        private final LockerCoordinator coordinator = new LockerCoordinator();
 
         public SameThreadWriteLocker(String name, int nameIndex, long timeout, 
StringBuffer buffer) {
             this.name = name;
@@ -290,13 +226,13 @@ public class TestMemoryLocks extends XTestCase {
 
         public void run() {
             try {
-                startLatch.countDown();
+                coordinator.startDone();
                 log.info("Getting lock [{0}]", nameIndex);
                 MemoryLocks.MemoryLockToken token = getLock();
                 MemoryLocks.MemoryLockToken token2 = getLock();
 
                 if (token != null) {
-                    acquireLockLatch.countDown();
+                    coordinator.lockAcquireDone();
 
                     log.info("Got lock [{0}]", nameIndex);
                     sb.append(nameIndex + "-L1 ");
@@ -305,7 +241,7 @@ public class TestMemoryLocks extends XTestCase {
                     }
                     sb.append(nameIndex + "-U1 ");
 
-                    proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
+                    coordinator.awaitContinueSignal();
 
                     token.release();
                     sb.append(nameIndex + "-U2 ");
@@ -313,17 +249,33 @@ public class TestMemoryLocks extends XTestCase {
                     log.info("Release lock [{0}]", nameIndex);
                 }
                 else {
-                    proceedingLatch.await(LATCH_TIMEOUT, TimeUnit.SECONDS);
+                    coordinator.awaitContinueSignal();
                     sb.append(nameIndex + "-N ");
                     log.info("Did not get lock [{0}]", nameIndex);
                 }
-                terminationLatch.countDown();
+                coordinator.terminated();
             }
             catch (Exception ex) {
                 throw new RuntimeException(ex);
             }
         }
 
+        public void awaitLockAcquire() throws InterruptedException {
+            coordinator.awaitLockAcquire();
+        }
+
+        public void awaitStart() throws InterruptedException {
+            coordinator.awaitStart();
+        }
+
+        public void proceed() {
+            coordinator.signalLockerContinue();
+        }
+
+        public void awaitTermination() throws InterruptedException {
+            coordinator.awaitTermination();
+        }
+
         protected MemoryLocks.MemoryLockToken getLock() throws 
InterruptedException {
             return locks.getLock(name, Type.WRITE, timeout);
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java 
b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
index b7dee7e..ee83867 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKLocksService.java
@@ -26,6 +26,7 @@ import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.lock.TestMemoryLocks;
 import org.apache.oozie.service.ZKLocksService.ZKLockToken;
 import org.apache.oozie.test.ZKXTestCase;
+import org.apache.oozie.util.Locker;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.ZKUtils;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -64,59 +65,12 @@ public class TestZKLocksService extends ZKXTestCase {
         }
     }
 
-    public abstract class Locker implements Runnable {
-        protected String name;
-        private String nameIndex;
-        private StringBuffer sb;
-        protected long timeout;
-        protected ZKLocksService zkls;
-
-        public Locker(String name, int nameIndex, long timeout, StringBuffer 
buffer, ZKLocksService zkls) {
-            this.name = name;
-            this.nameIndex = name + ":" + nameIndex;
-            this.sb = buffer;
-            this.timeout = timeout;
-            this.zkls = zkls;
-        }
-
-        @Override
-        public void run() {
-            try {
-                log.info("Getting lock [{0}]", nameIndex);
-                LockToken token = getLock();
-                if (token != null) {
-                    log.info("Got lock [{0}]", nameIndex);
-                    sb.append(nameIndex).append("-L ");
-                    synchronized (this) {
-                        wait();
-                    }
-                    sb.append(nameIndex).append("-U ");
-                    token.release();
-                    log.info("Release lock [{0}]", nameIndex);
-                }
-                else {
-                    sb.append(nameIndex).append("-N ");
-                    log.info("Did not get lock [{0}]", nameIndex);
-                }
-            }
-            catch (Exception ex) {
-                throw new RuntimeException(ex);
-            }
-        }
-
-        public void finish() {
-            synchronized (this) {
-                notify();
-            }
-        }
-
-        protected abstract ZKLocksService.ZKLockToken getLock() throws 
InterruptedException;
-    }
-
     public class ReadLocker extends Locker {
+        private final ZKLocksService zkls;
 
         public ReadLocker(String name, int nameIndex, long timeout, 
StringBuffer buffer, ZKLocksService zkls) {
-            super(name, nameIndex, timeout, buffer, zkls);
+            super(name, nameIndex, timeout, buffer);
+            this.zkls = zkls;
         }
 
         @Override
@@ -126,9 +80,11 @@ public class TestZKLocksService extends ZKXTestCase {
     }
 
     public class WriteLocker extends Locker {
+        private final ZKLocksService zkls;
 
         public WriteLocker(String name, int nameIndex, long timeout, 
StringBuffer buffer, ZKLocksService zkls) {
-            super(name, nameIndex, timeout, buffer, zkls);
+            super(name, nameIndex, timeout, buffer);
+            this.zkls = zkls;
         }
 
         @Override
@@ -169,13 +125,16 @@ public class TestZKLocksService extends ZKXTestCase {
         Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2);
 
         new Thread(l1).start();
-        sleep(1000);
+        l1.awaitLockAcquire();
         new Thread(l2).start();
-        sleep(1000);
-        l1.finish();
-        sleep(1000);
-        l2.finish();
-        sleep(1000);
+        l2.awaitStart();
+
+        l1.proceed();
+        l2.proceed();
+
+        l1.awaitTermination();
+        l2.awaitTermination();
+
         assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
     }
 
@@ -211,13 +170,17 @@ public class TestZKLocksService extends ZKXTestCase {
         Locker l2 = new WriteLocker("a", 2, 0, sb, zkls2);
 
         new Thread(l1).start();
-        sleep(1000);
+        l1.awaitLockAcquire();
+
         new Thread(l2).start();
-        sleep(1000);
-        l1.finish();
-        sleep(1000);
-        l2.finish();
-        sleep(1000);
+        l2.awaitStart();
+
+        l2.proceed();
+        l2.awaitTermination();
+
+        l1.proceed();
+        l1.awaitTermination();
+
         assertEquals("a:1-L a:2-N a:1-U", sb.toString().trim());
     }
 
@@ -253,13 +216,16 @@ public class TestZKLocksService extends ZKXTestCase {
         Locker l2 = new WriteLocker("a", 2, (long) (WAITFOR_RATIO * 2000), sb, 
zkls2);
 
         new Thread(l1).start();
-        sleep(1000);
+        l1.awaitLockAcquire();
         new Thread(l2).start();
-        sleep(1000);
-        l1.finish();
-        sleep(1000);
-        l2.finish();
-        sleep(1000);
+        l2.awaitStart();
+
+        l1.proceed();
+        l1.awaitTermination();
+
+        l2.proceed();
+        l2.awaitTermination();
+
         assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
     }
 
@@ -295,13 +261,16 @@ public class TestZKLocksService extends ZKXTestCase {
         Locker l2 = new WriteLocker("a", 2, 50, sb, zkls2);
 
         new Thread(l1).start();
-        sleep(1000);
+        l1.awaitLockAcquire();
         new Thread(l2).start();
-        sleep(1000);
-        l1.finish();
-        sleep(1000);
-        l2.finish();
-        sleep(1000);
+        l2.awaitStart();
+
+        l2.proceed();
+        l2.awaitTermination();
+
+        l1.proceed();
+        l1.awaitTermination();
+
         assertEquals("a:1-L a:2-N a:1-U", sb.toString().trim());
     }
 
@@ -337,13 +306,16 @@ public class TestZKLocksService extends ZKXTestCase {
         Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2);
 
         new Thread(l1).start();
-        sleep(1000);
+        l1.awaitLockAcquire();
         new Thread(l2).start();
-        sleep(1000);
-        l1.finish();
-        sleep(1000);
-        l2.finish();
-        sleep(1000);
+        l2.awaitLockAcquire();
+
+        l1.proceed();
+        l1.awaitTermination();
+
+        l2.proceed();
+        l2.awaitTermination();
+
         assertEquals("a:1-L a:2-L a:1-U a:2-U", sb.toString().trim());
     }
 
@@ -379,13 +351,16 @@ public class TestZKLocksService extends ZKXTestCase {
         Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2);
 
         new Thread(l1).start();
-        sleep(1000);
+        l1.awaitLockAcquire();
         new Thread(l2).start();
-        sleep(1000);
-        l1.finish();
-        sleep(1000);
-        l2.finish();
-        sleep(1000);
+        l2.awaitStart();
+
+        l1.proceed();
+        l1.awaitTermination();
+
+        l2.proceed();
+        l2.awaitTermination();
+
         assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
     }
 
@@ -421,13 +396,15 @@ public class TestZKLocksService extends ZKXTestCase {
         Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2);
 
         new Thread(l1).start();
-        sleep(1000);
+        l1.awaitLockAcquire();
         new Thread(l2).start();
-        sleep(1000);
-        l1.finish();
-        sleep(1000);
-        l2.finish();
-        sleep(1000);
+        l2.awaitStart();
+
+        l1.proceed();
+        l1.awaitTermination();
+
+        l2.proceed();
+        l2.awaitTermination();
         assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
     }
 
@@ -583,9 +560,9 @@ public class TestZKLocksService extends ZKXTestCase {
                     try {
                         // Stop the exception on release() after some time in 
other thread
                         Thread.sleep(TimeUnit.SECONDS.toMillis(13));
-                        Mockito.doAnswer(new Answer() {
+                        Mockito.doAnswer(new Answer<Void>() {
                             @Override
-                            public Object answer(InvocationOnMock invocation) 
throws Throwable {
+                            public Void answer(InvocationOnMock invocation) 
throws Throwable {
                                 lockReleased[0] = true;
                                 return null;
                             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/util/Locker.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/Locker.java 
b/core/src/test/java/org/apache/oozie/util/Locker.java
new file mode 100644
index 0000000..23431e6
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/Locker.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.oozie.lock.LockToken;
+
+public abstract class Locker implements Runnable {
+    private static final XLog log = new XLog(LogFactory.getLog(Locker.class));
+    private final String nameIndex;
+    private final StringBuffer sb;
+    private final LockerCoordinator coordinator = new LockerCoordinator();
+
+    protected String name;
+    protected long timeout;
+
+    public Locker(String name, int nameIndex, long timeout, StringBuffer 
buffer) {
+        this.name = name;
+        this.nameIndex = name + ":" + nameIndex;
+        this.sb = buffer;
+        this.timeout = timeout;
+    }
+
+    @Override
+    public void run() {
+        try {
+            log.info("Getting lock [{0}]", nameIndex);
+            coordinator.startDone();
+            LockToken token = getLock();
+            if (token != null) {
+                log.info("Got lock [{0}]", nameIndex);
+                sb.append(nameIndex + "-L ");
+
+                coordinator.lockAcquireDone();
+                coordinator.awaitContinueSignal();
+
+                sb.append(nameIndex + "-U ");
+                token.release();
+                log.info("Release lock [{0}]", nameIndex);
+            }
+            else {
+                coordinator.awaitContinueSignal();
+                sb.append(nameIndex + "-N ");
+                log.info("Did not get lock [{0}]", nameIndex);
+            }
+            coordinator.terminated();
+        }
+        catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    public void awaitLockAcquire() throws InterruptedException {
+        coordinator.awaitLockAcquire();
+    }
+
+    public void awaitStart() throws InterruptedException {
+        coordinator.awaitStart();
+    }
+
+    public void proceed() {
+        coordinator.signalLockerContinue();
+    }
+
+    public void awaitTermination() throws InterruptedException {
+        coordinator.awaitTermination();
+    }
+
+    protected abstract LockToken getLock() throws InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java 
b/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java
new file mode 100644
index 0000000..67a7bfe
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/LockerCoordinator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+// Helper class for some multithreaded tests
+public class LockerCoordinator {
+    public static final int LATCH_TIMEOUT_SECONDS = 10;
+
+    private final CountDownLatch startLatch = new CountDownLatch(1);
+    private final CountDownLatch acquireLockLatch = new CountDownLatch(1);
+    private final CountDownLatch proceedingLatch = new CountDownLatch(1);
+    private final CountDownLatch terminationLatch = new CountDownLatch(1);
+
+    // Test thread waits until Locker thread starts to run
+    public void awaitStart() throws InterruptedException {
+        startLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    // Test thread waits until Locker thread terminates
+    public void awaitTermination() throws InterruptedException {
+        terminationLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    // Test thread waits until lock acquisition succeeds
+    public void awaitLockAcquire() throws InterruptedException {
+        acquireLockLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    // Locker thread blocks until test thread unblocks it
+    public void awaitContinueSignal() throws InterruptedException {
+        proceedingLatch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    // Test thread unblocks Locker thread
+    public void signalLockerContinue() {
+        proceedingLatch.countDown();
+    }
+
+    // Locker thread has started
+    public void startDone() {
+        startLatch.countDown();
+    }
+
+    // Locker thread acquired the lock
+    public void lockAcquireDone() {
+        acquireLockLatch.countDown();
+    }
+
+    // Locker thread finished
+    public void terminated() {
+        terminationLatch.countDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/526dbd19/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4411d60..8cb8c3d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3237 Flaky test TestZKLocksService#testWriteReadLockThreads (pbacsko via 
andras.piros)
 OOZIE-3251 Disable JMX for ActiveMQ in the tests (pbacsko)
 OOZIE-2826 Upgrade joda-time to 2.9.9 (dbist13 via andras.piros)
 OOZIE-3094 [Docs] Fix for grammar mistake in DG_ActionAuthentication.twiki 
(dbist13 via andras.piros)

Reply via email to