ACCUMULO-3180 Better test naming and some extra functionality to SlowIterator
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5459950d Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5459950d Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5459950d Branch: refs/heads/1.6 Commit: 5459950dd70f7df01cf5dbebb8324e80cb034548 Parents: d8feff8 Author: Josh Elser <[email protected]> Authored: Mon Sep 29 19:33:33 2014 -0400 Committer: Josh Elser <[email protected]> Committed: Mon Sep 29 19:33:33 2014 -0400 ---------------------------------------------------------------------- .../accumulo/test/functional/SlowIterator.java | 39 ++++-- .../apache/accumulo/test/Accumulo3030IT.java | 83 ------------- .../test/AllowScansToBeInterruptedIT.java | 123 +++++++++++++++++++ 3 files changed, 155 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5459950d/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java index a426b7f..a9b254e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java @@ -17,9 +17,13 @@ package org.apache.accumulo.test.functional; import java.io.IOException; +import java.util.Collection; import java.util.Map; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -27,24 +31,45 @@ import org.apache.accumulo.core.iterators.WrappingIterator; import org.apache.accumulo.core.util.UtilWaitThread; public class SlowIterator extends WrappingIterator { - - long sleepTime; - + + static private final String SLEEP_TIME = "sleepTime"; + static private final String SEEK_SLEEP_TIME = "seekSleepTime"; + private long sleepTime = 0; + private long seekSleepTime = 0; + + public static void setSleepTime(IteratorSetting is, long millis) { + is.addOption(SLEEP_TIME, Long.toString(millis)); + } + + public static void setSeekSleepTime(IteratorSetting is, long t) { + is.addOption(SEEK_SLEEP_TIME, Long.toString(t)); + } + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); } - + @Override public void next() throws IOException { UtilWaitThread.sleep(sleepTime); super.next(); } - + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + UtilWaitThread.sleep(seekSleepTime); + super.seek(range, columnFamilies, inclusive); + } + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { super.init(source, options, env); - sleepTime = Long.parseLong(options.get("sleepTime")); + if (options.containsKey(SLEEP_TIME)) + sleepTime = Long.parseLong(options.get(SLEEP_TIME)); + + if (options.containsKey(SEEK_SLEEP_TIME)) + seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME)); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5459950d/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java b/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java deleted file mode 100644 index bc56346..0000000 --- a/test/src/test/java/org/apache/accumulo/test/Accumulo3030IT.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test; - -import java.util.Map.Entry; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.ConfigurableMacIT; -import org.apache.accumulo.test.functional.SlowIterator; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Test; - -public class Accumulo3030IT extends ConfigurableMacIT { - - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumTservers(1); - } - - @Test(timeout = 60 * 1000) - public void test() throws Exception { - // make a table - final String tableName = getUniqueNames(1)[0]; - final Connector conn = getConnector(); - conn.tableOperations().create(tableName); - // make the world's slowest scanner - final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); - final IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class); - SlowIterator.setSeekSleepTime(cfg, 99999*1000); - scanner.addScanIterator(cfg); - // create a thread to interrupt the slow scan - final Thread scanThread = Thread.currentThread(); - Thread thread = new Thread() { - @Override - public void run() { - try { - // ensure the scan is running: not perfect, the metadata tables could be scanned, too. - String tserver = conn.instanceOperations().getTabletServers().iterator().next(); - while (conn.instanceOperations().getActiveScans(tserver).size() < 1) { - UtilWaitThread.sleep(1000); - } - } catch (Exception e) { - e.printStackTrace(); - } - // BAM! - scanThread.interrupt(); - } - }; - thread.start(); - try { - // Use the scanner, expect problems - for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) { - } - Assert.fail("Scan should not succeed"); - } catch (Exception ex) { - } finally { - thread.join(); - } - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5459950d/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java b/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java new file mode 100644 index 0000000..bbef6ea --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/AllowScansToBeInterruptedIT.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AllowScansToBeInterruptedIT { + private static final Logger log = Logger.getLogger(AllowScansToBeInterruptedIT.class); + + public static TemporaryFolder folder = new TemporaryFolder(); + private MiniAccumuloCluster accumulo; + private String secret = "secret"; + + @Before + public void setUp() throws Exception { + folder.create(); + log.info("Using MAC at " + folder.getRoot()); + MiniAccumuloConfig cfg = new MiniAccumuloConfig(folder.getRoot(), secret); + cfg.setNumTservers(1); + accumulo = new MiniAccumuloCluster(cfg); + accumulo.start(); + } + + @After + public void tearDown() throws Exception { + accumulo.stop(); + folder.delete(); + } + + Connector getConnector() throws AccumuloException, AccumuloSecurityException { + ZooKeeperInstance zki = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); + return zki.getConnector("root", new PasswordToken(secret)); + } + + @Test(timeout = 60 * 1000) + public void test() throws Exception { + // make a table + final String tableName = "test"; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + // make the world's slowest scanner + final Scanner scanner = conn.createScanner(tableName, Constants.NO_AUTHS); + final IteratorSetting cfg = new IteratorSetting(100, SlowIterator.class); + SlowIterator.setSeekSleepTime(cfg, 99999*1000); + scanner.addScanIterator(cfg); + // create a thread to interrupt the slow scan + final Thread scanThread = Thread.currentThread(); + Thread thread = new Thread() { + @Override + public void run() { + try { + // ensure the scan is running: not perfect, the metadata tables could be scanned, too. + String tserver = conn.instanceOperations().getTabletServers().iterator().next(); + List<ActiveScan> scans = null; + while (null == scans) { + try { + // Sometimes getting errors the first time around + scans = conn.instanceOperations().getActiveScans(tserver); + } catch (Exception e) { + log.warn("Could not connect to tserver " + tserver, e); + } + } + while (scans.size() < 1) { + UtilWaitThread.sleep(1000); + scans = conn.instanceOperations().getActiveScans(tserver); + } + } catch (Exception e) { + e.printStackTrace(); + } + // BAM! + scanThread.interrupt(); + } + }; + thread.start(); + try { + // Use the scanner, expect problems + for (@SuppressWarnings("unused") Entry<Key,Value> entry : scanner) { + } + Assert.fail("Scan should not succeed"); + } catch (Exception ex) { + } finally { + thread.join(); + } + } + +}
