Repository: incubator-tephra Updated Branches: refs/heads/master ae6ce2b5e -> 7cfe06125
(TEPHRA-253) Fix flaky TransactionProcessorTest This closes #54 from GitHub. Signed-off-by: anew <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/7cfe0612 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/7cfe0612 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/7cfe0612 Branch: refs/heads/master Commit: 7cfe0612538ead57ec66e580ed1d2d9e950c5c73 Parents: ae6ce2b Author: anew <[email protected]> Authored: Fri Sep 8 23:43:55 2017 -0700 Committer: anew <[email protected]> Committed: Sun Sep 10 21:08:51 2017 -0700 ---------------------------------------------------------------------- .../coprocessor/TransactionProcessorTest.java | 22 ++++++++++++++--- .../coprocessor/TransactionProcessorTest.java | 23 ++++++++++++++--- .../coprocessor/TransactionProcessorTest.java | 23 ++++++++++++++--- .../coprocessor/TransactionProcessorTest.java | 23 ++++++++++++++--- .../coprocessor/TransactionProcessorTest.java | 26 ++++++++++++++++---- .../coprocessor/TransactionProcessorTest.java | 24 +++++++++++++++--- 6 files changed, 121 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 1879116..3c7d1e2 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -74,6 +74,7 @@ import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.util.TxUtils; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -175,7 +176,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); for (int i = 1; i <= 8; i++) { for (int k = 1; k <= i; k++) { @@ -190,7 +191,7 @@ public class TransactionProcessorTest { // force a flush to clear the data // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set LOG.info("Flushing region " + region.getRegionNameAsString()); - region.flushcache(); + region.flushcache(); // in 0.96, there is no indication of success // now a normal scan should only return the valid rows - testing that cleanup works on flush Scan scan = new Scan(); @@ -231,7 +232,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); byte[] row = Bytes.toBytes(1); for (int i = 4; i < V.length; i++) { @@ -620,6 +621,21 @@ public class TransactionProcessorTest { cache.stopAndWait(); } + private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException { + long timeout = 5000; // ms + do { + TransactionVisibilityState state = cache.getLatestState(); + if (state != null) { + return state; + } + TimeUnit.MILLISECONDS.sleep(100); + timeout -= 100; + } while (timeout > 0L); + LOG.error("Timed out waiting foe transaction state cache"); + Assert.fail("Timed out waiting foe transaction state cache"); + return null; + } + private static class MockRegionServerServices implements RegionServerServices { private final Configuration hConf; private final ZooKeeperWatcher zookeeper; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index abe375d..b8e051b 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -80,6 +80,7 @@ import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.util.TxUtils; import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -180,7 +181,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); for (int i = 1; i <= 8; i++) { for (int k = 1; k <= i; k++) { @@ -195,7 +196,8 @@ public class TransactionProcessorTest { // force a flush to clear the data // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set LOG.info("Flushing region " + region.getRegionNameAsString()); - region.flushcache(); + HRegion.FlushResult flushResult = region.flushcache(); + Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded()); // now a normal scan should only return the valid rows // do not use a filter here to test that cleanup works on flush @@ -237,7 +239,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); byte[] row = Bytes.toBytes(1); for (int i = 4; i < V.length; i++) { @@ -624,6 +626,21 @@ public class TransactionProcessorTest { cache.stopAndWait(); } + private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException { + long timeout = 5000; // ms + do { + TransactionVisibilityState state = cache.getLatestState(); + if (state != null) { + return state; + } + TimeUnit.MILLISECONDS.sleep(100); + timeout -= 100; + } while (timeout > 0L); + LOG.error("Timed out waiting foe transaction state cache"); + Assert.fail("Timed out waiting foe transaction state cache"); + return null; + } + private static class MockRegionServerServices implements RegionServerServices { private final Configuration hConf; private final ZooKeeperWatcher zookeeper; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index f6d8e2d..9ce30b5 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -62,6 +62,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec; import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.util.TxUtils; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -160,7 +161,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); for (int i = 1; i <= 8; i++) { for (int k = 1; k <= i; k++) { @@ -175,7 +176,8 @@ public class TransactionProcessorTest { // force a flush to clear the data // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set LOG.info("Flushing region " + region.getRegionNameAsString()); - region.flushcache(); + HRegion.FlushResult flushResult = region.flushcache(); + Assert.assertTrue("Unexpected flush result: " + flushResult.toString(), flushResult.isFlushSucceeded()); // now a normal scan should only return the valid rows // do not use a filter here to test that cleanup works on flush @@ -217,7 +219,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); byte[] row = Bytes.toBytes(1); for (int i = 4; i < V.length; i++) { @@ -606,6 +608,21 @@ public class TransactionProcessorTest { cache.stopAndWait(); } + private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException { + long timeout = 5000; // ms + do { + TransactionVisibilityState state = cache.getLatestState(); + if (state != null) { + return state; + } + TimeUnit.MILLISECONDS.sleep(100); + timeout -= 100; + } while (timeout > 0L); + LOG.error("Timed out waiting foe transaction state cache"); + Assert.fail("Timed out waiting foe transaction state cache"); + return null; + } + private static class LocalRegionServerServices extends MockRegionServerServices { private final ServerName serverName; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 8dfce32..0ec3b46 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -62,6 +62,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec; import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.util.TxUtils; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -160,7 +161,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); for (int i = 1; i <= 8; i++) { for (int k = 1; k <= i; k++) { @@ -175,7 +176,8 @@ public class TransactionProcessorTest { // force a flush to clear the data // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set LOG.info("Flushing region " + region.getRegionNameAsString()); - region.flushcache(); + HRegion.FlushResult flushResult = region.flushcache(); + Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded()); // now a normal scan should only return the valid rows // do not use a filter here to test that cleanup works on flush @@ -217,7 +219,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); byte[] row = Bytes.toBytes(1); for (int i = 4; i < V.length; i++) { @@ -606,6 +608,21 @@ public class TransactionProcessorTest { cache.stopAndWait(); } + private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException { + long timeout = 5000; // ms + do { + TransactionVisibilityState state = cache.getLatestState(); + if (state != null) { + return state; + } + TimeUnit.MILLISECONDS.sleep(100); + timeout -= 100; + } while (timeout > 0L); + LOG.error("Timed out waiting foe transaction state cache"); + Assert.fail("Timed out waiting foe transaction state cache"); + return null; + } + private static class LocalRegionServerServices extends MockRegionServerServices { private final ServerName serverName; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 9f7206d..f133735 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.util.Bytes; @@ -62,6 +63,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec; import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.util.TxUtils; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -160,7 +162,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); for (int i = 1; i <= 8; i++) { for (int k = 1; k <= i; k++) { @@ -174,10 +176,10 @@ public class TransactionProcessorTest { // force a flush to clear the data // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set - LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString()); - region.flushcache(true, false); - + Region.FlushResult flushResult = region.flushcache(true, false); + Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded()); + // now a normal scan should only return the valid rows // do not use a filter here to test that cleanup works on flush Scan scan = new Scan(); @@ -218,7 +220,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); byte[] row = Bytes.toBytes(1); for (int i = 4; i < V.length; i++) { @@ -608,6 +610,20 @@ public class TransactionProcessorTest { cache.stopAndWait(); } + private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException { + long timeout = 5000; // ms + do { + TransactionVisibilityState state = cache.getLatestState(); + if (state != null) { + return state; + } + TimeUnit.MILLISECONDS.sleep(100); + timeout -= 100; + } while (timeout > 0L); + Assert.fail("Timed out waiting foe transaction state cache"); + return null; + } + private static class LocalRegionServerServices extends MockRegionServerServices { private final ServerName serverName; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/7cfe0612/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java index 15842a3..4c8fa64 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.util.Bytes; @@ -62,6 +63,7 @@ import org.apache.tephra.snapshot.DefaultSnapshotCodec; import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.util.TxUtils; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -160,7 +162,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); for (int i = 1; i <= 8; i++) { for (int k = 1; k <= i; k++) { @@ -176,7 +178,8 @@ public class TransactionProcessorTest { // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString()); - region.flushcache(true, false); + Region.FlushResult flushResult = region.flushcache(true, false); + Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded()); // now a normal scan should only return the valid rows // do not use a filter here to test that cleanup works on flush @@ -218,7 +221,7 @@ public class TransactionProcessorTest { try { region.initialize(); TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); - LOG.info("Coprocessor is using transaction state: " + cache.getLatestState()); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); byte[] row = Bytes.toBytes(1); for (int i = 4; i < V.length; i++) { @@ -608,6 +611,21 @@ public class TransactionProcessorTest { cache.stopAndWait(); } + private TransactionVisibilityState waitForTransactionState(TransactionStateCache cache) throws InterruptedException { + long timeout = 5000; // ms + do { + TransactionVisibilityState state = cache.getLatestState(); + if (state != null) { + return state; + } + TimeUnit.MILLISECONDS.sleep(100); + timeout -= 100; + } while (timeout > 0L); + LOG.error("Timed out waiting foe transaction state cache"); + Assert.fail("Timed out waiting foe transaction state cache"); + return null; + } + private static class LocalRegionServerServices extends MockRegionServerServices { private final ServerName serverName;
