http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java index 779b407,0000000..751e827 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CleanTmpIT.java @@@ -1,112 -1,0 +1,112 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; + +public class CleanTmpIT extends ConfigurableMacBase { + private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class); + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setNumTservers(1); + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + // make a table + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + // write to it + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("row"); + m.put("cf", "cq", "value"); + bw.addMutation(m); + bw.flush(); + + // Compact memory to make a file + c.tableOperations().compact(tableName, null, null, true, true); + + // Make sure that we'll have a WAL + m = new Mutation("row2"); + m.put("cf", "cq", "value"); + bw.addMutation(m); + bw.close(); + + // create a fake _tmp file in its directory + String id = c.tableOperations().tableIdMap().get(tableName); + Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(Range.prefix(id)); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + Path file = new Path(entry.getKey().getColumnQualifier().toString()); + + FileSystem fs = getCluster().getFileSystem(); + assertTrue("Could not find file: " + file, fs.exists(file)); + Path tabletDir = file.getParent(); + assertNotNull("Tablet dir should not be null", tabletDir); + Path tmp = new Path(tabletDir, "junk.rf_tmp"); + // Make the file + fs.create(tmp).close(); + log.info("Created tmp file {}", tmp.toString()); + getCluster().stop(); + getCluster().start(); + + Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY); + assertEquals(2, Iterators.size(scanner.iterator())); + // If we performed log recovery, we should have cleaned up any stray files + assertFalse("File still exists: " + tmp, fs.exists(tmp)); + } +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 003d66f,0000000..ee181df mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@@ -1,185 -1,0 +1,185 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.cli.ClientOpts.Password; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +public class CompactionIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(CompactionIT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); + cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1"); + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + private String majcThreadMaxOpen, majcDelay, majcMaxConcurrent; + + @Before + public void alterConfig() throws Exception { + if (ClusterType.STANDALONE == getClusterType()) { + InstanceOperations iops = getConnector().instanceOperations(); + Map<String,String> config = iops.getSystemConfiguration(); + majcThreadMaxOpen = config.get(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()); + majcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey()); + majcMaxConcurrent = config.get(Property.TSERV_MAJC_MAXCONCURRENT.getKey()); + + iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4"); + iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "1"); + iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1"); + + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + } + + @After + public void resetConfig() throws Exception { + // We set the values.. + if (null != majcThreadMaxOpen) { + InstanceOperations iops = getConnector().instanceOperations(); + + iops.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), majcThreadMaxOpen); + iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay); + iops.setProperty(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), majcMaxConcurrent); + + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + } + + @Test + public void test() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0"); + FileSystem fs = getFileSystem(); + Path root = new Path(cluster.getTemporaryPath(), getClass().getName()); + Path testrf = new Path(root, "testrf"); + FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4); + + FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString()); + int beforeCount = countFiles(c); + + final AtomicBoolean fail = new AtomicBoolean(false); + final ClientConfiguration clientConf = cluster.getClientConfig(); + final int THREADS = 5; + for (int count = 0; count < THREADS; count++) { + ExecutorService executor = Executors.newFixedThreadPool(THREADS); + final int span = 500000 / 59; + for (int i = 0; i < 500000; i += 500000 / 59) { + final int finalI = i; + Runnable r = new Runnable() { + @Override + public void run() { + try { + VerifyIngest.Opts opts = new VerifyIngest.Opts(); + opts.startRow = finalI; + opts.rows = span; + opts.random = 56; + opts.dataSize = 50; + opts.cols = 1; + opts.setTableName(tableName); + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConf); + } else { + opts.setPrincipal(getAdminPrincipal()); + PasswordToken passwordToken = (PasswordToken) getAdminToken(); + opts.setPassword(new Password(new String(passwordToken.getPassword(), UTF_8))); + } + VerifyIngest.verifyIngest(c, opts, new ScannerOpts()); + } catch (Exception ex) { + log.warn("Got exception verifying data", ex); + fail.set(true); + } + } + }; + executor.execute(r); + } + executor.shutdown(); + executor.awaitTermination(defaultTimeoutSeconds(), TimeUnit.SECONDS); + assertFalse("Failed to successfully run all threads, Check the test output for error", fail.get()); + } + + int finalCount = countFiles(c); + assertTrue(finalCount < beforeCount); + try { + getClusterControl().adminStopAll(); + } finally { + // Make sure the internal state in the cluster is reset (e.g. processes in MAC) + getCluster().stop(); + if (ClusterType.STANDALONE == getClusterType()) { + // Then restart things for the next test if it's a standalone + getCluster().start(); + } + } + } + + private int countFiles(Connector c) throws Exception { + Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.fetchColumnFamily(MetadataSchema.TabletsSection.TabletColumnFamily.NAME); + s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + return Iterators.size(s.iterator()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java index ce9ad85,0000000..4078e69 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DurabilityIT.java @@@ -1,233 -1,0 +1,233 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.test.mrit.IntegrationTestMapReduce; +import org.apache.accumulo.test.PerformanceTest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +@Category(PerformanceTest.class) +public class DurabilityIT extends ConfigurableMacBase { + private static final Logger log = LoggerFactory.getLogger(DurabilityIT.class); + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setNumTservers(1); + } + + @BeforeClass + static public void checkMR() { + assumeFalse(IntegrationTestMapReduce.isMapReduce()); + } + + static final long N = 100000; + + private String[] init() throws Exception { + String[] tableNames = getUniqueNames(4); + Connector c = getConnector(); + TableOperations tableOps = c.tableOperations(); + createTable(tableNames[0]); + createTable(tableNames[1]); + createTable(tableNames[2]); + createTable(tableNames[3]); + // default is sync + tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush"); + tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log"); + tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none"); + return tableNames; + } + + private void cleanup(String[] tableNames) throws Exception { + Connector c = getConnector(); + for (String tableName : tableNames) { + c.tableOperations().delete(tableName); + } + } + + private void createTable(String tableName) throws Exception { + TableOperations tableOps = getConnector().tableOperations(); + tableOps.create(tableName); + } + + @Test(timeout = 2 * 60 * 1000) + public void testWriteSpeed() throws Exception { + TableOperations tableOps = getConnector().tableOperations(); + String tableNames[] = init(); + // write some gunk, delete the table to keep that table from messing with the performance numbers of successive calls + // sync + long t0 = writeSome(tableNames[0], N); + tableOps.delete(tableNames[0]); + // flush + long t1 = writeSome(tableNames[1], N); + tableOps.delete(tableNames[1]); + // log + long t2 = writeSome(tableNames[2], N); + tableOps.delete(tableNames[2]); + // none + long t3 = writeSome(tableNames[3], N); + tableOps.delete(tableNames[3]); + System.out.println(String.format("sync %d flush %d log %d none %d", t0, t1, t2, t3)); + assertTrue("flush should be faster than sync", t0 > t1); + assertTrue("log should be faster than flush", t1 > t2); + assertTrue("no durability should be faster than log", t2 > t3); + } + + @Test(timeout = 4 * 60 * 1000) + public void testSync() throws Exception { + String tableNames[] = init(); + // sync table should lose nothing + writeSome(tableNames[0], N); + restartTServer(); + assertEquals(N, readSome(tableNames[0])); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testFlush() throws Exception { + String tableNames[] = init(); + // flush table won't lose anything since we're not losing power/dfs + writeSome(tableNames[1], N); + restartTServer(); + assertEquals(N, readSome(tableNames[1])); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testLog() throws Exception { + String tableNames[] = init(); + // we're probably going to lose something the the log setting + writeSome(tableNames[2], N); + restartTServer(); + long numResults = readSome(tableNames[2]); + assertTrue("Expected " + N + " >= " + numResults, N >= numResults); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testNone() throws Exception { + String tableNames[] = init(); + // probably won't get any data back without logging + writeSome(tableNames[3], N); + restartTServer(); + long numResults = readSome(tableNames[3]); + assertTrue("Expected " + N + " >= " + numResults, N >= numResults); + cleanup(tableNames); + } + + @Test(timeout = 4 * 60 * 1000) + public void testIncreaseDurability() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none"); + writeSome(tableName, N); + restartTServer(); + long numResults = readSome(tableName); + assertTrue("Expected " + N + " >= " + numResults, N >= numResults); + c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync"); + writeSome(tableName, N); + restartTServer(); + assertTrue(N == readSome(tableName)); + } + + private static Map<String,String> map(Iterable<Entry<String,String>> entries) { + Map<String,String> result = new HashMap<String,String>(); + for (Entry<String,String> entry : entries) { + result.put(entry.getKey(), entry.getValue()); + } + return result; + } + + @Test(timeout = 4 * 60 * 1000) + public void testMetaDurability() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.instanceOperations().setProperty(Property.TABLE_DURABILITY.getKey(), "none"); + Map<String,String> props = map(c.tableOperations().getProperties(MetadataTable.NAME)); + assertEquals("sync", props.get(Property.TABLE_DURABILITY.getKey())); + c.tableOperations().create(tableName); + props = map(c.tableOperations().getProperties(tableName)); + assertEquals("none", props.get(Property.TABLE_DURABILITY.getKey())); + restartTServer(); + assertTrue(c.tableOperations().exists(tableName)); + } + + private long readSome(String table) throws Exception { + return Iterators.size(getConnector().createScanner(table, Authorizations.EMPTY).iterator()); + } + + private void restartTServer() throws Exception { + for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, proc); + } + cluster.start(); + } + + private long writeSome(String table, long count) throws Exception { + int iterations = 5; + long[] attempts = new long[iterations]; + for (int attempt = 0; attempt < iterations; attempt++) { + long now = System.currentTimeMillis(); + Connector c = getConnector(); + BatchWriter bw = c.createBatchWriter(table, null); + for (int i = 1; i < count + 1; i++) { + Mutation m = new Mutation("" + i); + m.put("", "", ""); + bw.addMutation(m); + if (i % (Math.max(1, count / 100)) == 0) { + bw.flush(); + } + } + bw.close(); + attempts[attempt] = System.currentTimeMillis() - now; + } + Arrays.sort(attempts); + log.info("Attempt durations: {}", Arrays.toString(attempts)); + // Return the median duration + return attempts[2]; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java index 7f11851,0000000..25e541b mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java @@@ -1,309 -1,0 +1,309 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.ServerServices; +import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.minicluster.MemoryUnit; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessNotFoundException; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Iterators; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +public class GarbageCollectorIT extends ConfigurableMacBase { + private static final String OUR_SECRET = "itsreallysecret"; + + @Override + public int defaultTimeoutSeconds() { + return 5 * 60; + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); ++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s"); + cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET); + cfg.setProperty(Property.GC_CYCLE_START, "1"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1"); + cfg.setProperty(Property.GC_PORT, "0"); + cfg.setProperty(Property.TSERV_MAXMEM, "5K"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "1"); + + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException { + // kill gc started by MAC + getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR).iterator().next()); + // delete lock in zookeeper if there, this will allow next GC to start quickly + String path = ZooUtil.getRoot(new ZooKeeperInstance(getCluster().getClientConfig())) + Constants.ZGC_LOCK; + ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET); + try { + ZooLock.deleteLock(zk, path); + } catch (IllegalStateException e) { + + } + + assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)); + } + + @Test + public void gcTest() throws Exception { + killMacGc(); + Connector c = getConnector(); + c.tableOperations().create("test_ingest"); + c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K"); + TestIngest.Opts opts = new TestIngest.Opts(); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + vopts.rows = opts.rows = 10000; + vopts.cols = opts.cols = 1; + opts.setPrincipal("root"); + vopts.setPrincipal("root"); + TestIngest.ingest(c, cluster.getFileSystem(), opts, new BatchWriterOpts()); + c.tableOperations().compact("test_ingest", null, null, true, true); + int before = countFiles(); + while (true) { + sleepUninterruptibly(1, TimeUnit.SECONDS); + int more = countFiles(); + if (more <= before) + break; + before = more; + } + + // restart GC + getCluster().start(); + sleepUninterruptibly(15, TimeUnit.SECONDS); + int after = countFiles(); + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + assertTrue(after < before); + } + + @Test + public void gcLotsOfCandidatesIT() throws Exception { + killMacGc(); + + log.info("Filling metadata table with bogus delete flags"); + Connector c = getConnector(); + addEntries(c, new BatchWriterOpts()); + cluster.getConfig().setDefaultMemory(10, MemoryUnit.MEGABYTE); + Process gc = cluster.exec(SimpleGarbageCollector.class); + sleepUninterruptibly(20, TimeUnit.SECONDS); + String output = ""; + while (!output.contains("delete candidates has exceeded")) { + byte buffer[] = new byte[10 * 1024]; + try { + int n = gc.getInputStream().read(buffer); + output = new String(buffer, 0, n, UTF_8); + } catch (IOException ex) { + break; + } + } + gc.destroy(); + assertTrue(output.contains("delete candidates has exceeded")); + } + + @Test + public void dontGCRootLog() throws Exception { + killMacGc(); + // dirty metadata + Connector c = getConnector(); + String table = getUniqueNames(1)[0]; + c.tableOperations().create(table); + // let gc run for a bit + cluster.start(); + sleepUninterruptibly(20, TimeUnit.SECONDS); + killMacGc(); + // kill tservers + for (ProcessReference ref : cluster.getProcesses().get(ServerType.TABLET_SERVER)) { + cluster.killProcess(ServerType.TABLET_SERVER, ref); + } + // run recovery + cluster.start(); + // did it recover? + Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + Iterators.size(scanner.iterator()); + } + + private Mutation createDelMutation(String path, String cf, String cq, String val) { + Text row = new Text(MetadataSchema.DeletesSection.getRowPrefix() + path); + Mutation delFlag = new Mutation(row); + delFlag.put(cf, cq, val); + return delFlag; + } + + @Test + public void testInvalidDelete() throws Exception { + killMacGc(); + + String table = getUniqueNames(1)[0]; + getConnector().tableOperations().create(table); + + BatchWriter bw2 = getConnector().createBatchWriter(table, new BatchWriterConfig()); + Mutation m1 = new Mutation("r1"); + m1.put("cf1", "cq1", "v1"); + bw2.addMutation(m1); + bw2.close(); + + getConnector().tableOperations().flush(table, null, null, true); + + // ensure an invalid delete entry does not cause GC to go berserk ACCUMULO-2520 + getConnector().securityOperations().grantTablePermission(getConnector().whoami(), MetadataTable.NAME, TablePermission.WRITE); + BatchWriter bw3 = getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + bw3.addMutation(createDelMutation("", "", "", "")); + bw3.addMutation(createDelMutation("", "testDel", "test", "valueTest")); + bw3.addMutation(createDelMutation("/", "", "", "")); + bw3.close(); + + Process gc = cluster.exec(SimpleGarbageCollector.class); + try { + String output = ""; + while (!output.contains("Ingoring invalid deletion candidate")) { + sleepUninterruptibly(250, TimeUnit.MILLISECONDS); + try { + output = FunctionalTestUtils.readAll(cluster, SimpleGarbageCollector.class, gc); + } catch (IOException ioe) { + log.error("Could not read all from cluster.", ioe); + } + } + } finally { + gc.destroy(); + } + + Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + assertTrue(iter.hasNext()); + Entry<Key,Value> entry = iter.next(); + Assert.assertEquals("r1", entry.getKey().getRow().toString()); + Assert.assertEquals("cf1", entry.getKey().getColumnFamily().toString()); + Assert.assertEquals("cq1", entry.getKey().getColumnQualifier().toString()); + Assert.assertEquals("v1", entry.getValue().toString()); + Assert.assertFalse(iter.hasNext()); + } + + @Test + public void testProperPortAdvertisement() throws Exception { + + Connector conn = getConnector(); + Instance instance = conn.getInstance(); + + ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET); + String path = ZooUtil.getRoot(instance) + Constants.ZGC_LOCK; + for (int i = 0; i < 5; i++) { + List<String> locks; + try { + locks = zk.getChildren(path, null); + } catch (NoNodeException e) { + Thread.sleep(5000); + continue; + } + + if (locks != null && locks.size() > 0) { + Collections.sort(locks); + + String lockPath = path + "/" + locks.get(0); + + String gcLoc = new String(zk.getData(lockPath, null)); + + Assert.assertTrue("Found unexpected data in zookeeper for GC location: " + gcLoc, gcLoc.startsWith(Service.GC_CLIENT.name())); + int loc = gcLoc.indexOf(ServerServices.SEPARATOR_CHAR); + Assert.assertNotEquals("Could not find split point of GC location for: " + gcLoc, -1, loc); + String addr = gcLoc.substring(loc + 1); + + int addrSplit = addr.indexOf(':'); + Assert.assertNotEquals("Could not find split of GC host:port for: " + addr, -1, addrSplit); + + String host = addr.substring(0, addrSplit), port = addr.substring(addrSplit + 1); + // We shouldn't have the "bindall" address in zk + Assert.assertNotEquals("0.0.0.0", host); + // Nor should we have the "random port" in zk + Assert.assertNotEquals(0, Integer.parseInt(port)); + return; + } + + Thread.sleep(5000); + } + + Assert.fail("Could not find advertised GC address"); + } + + private int countFiles() throws Exception { + Path path = new Path(cluster.getConfig().getDir() + "/accumulo/tables/1/*/*.rf"); + return Iterators.size(Arrays.asList(cluster.getFileSystem().globStatus(path)).iterator()); + } + + public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws Exception { + conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE); + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, bwOpts.getBatchWriterConfig()); + + for (int i = 0; i < 100000; ++i) { + final Text emptyText = new Text(""); + Text row = new Text(String.format("%s/%020d/%s", MetadataSchema.DeletesSection.getRowPrefix(), i, + "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj")); + Mutation delFlag = new Mutation(row); + delFlag.put(emptyText, emptyText, new Value(new byte[] {})); + bw.addMutation(delFlag); + } + bw.close(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/KerberosIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/KerberosIT.java index 6b6108a,0000000..f7a151e mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/KerberosIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/KerberosIT.java @@@ -1,644 -1,0 +1,644 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.DelegationTokenConfig; +import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; +import org.apache.accumulo.core.client.impl.DelegationTokenImpl; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.core.security.SystemPermission; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.harness.AccumuloITBase; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.TestingKdc; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +/** + * MAC test which uses {@link MiniKdc} to simulate ta secure environment. Can be used as a sanity check for Kerberos/SASL testing. + */ +public class KerberosIT extends AccumuloITBase { + private static final Logger log = LoggerFactory.getLogger(KerberosIT.class); + + private static TestingKdc kdc; + private static String krbEnabledForITs = null; + private static ClusterUser rootUser; + + @BeforeClass + public static void startKdc() throws Exception { + kdc = new TestingKdc(); + kdc.start(); + krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION); + if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true"); + } + rootUser = kdc.getRootUser(); + } + + @AfterClass + public static void stopKdc() throws Exception { + if (null != kdc) { + kdc.stop(); + } + if (null != krbEnabledForITs) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs); + } + UserGroupInformation.setConfiguration(new Configuration(false)); + } + + @Override + public int defaultTimeoutSeconds() { + return 60 * 5; + } + + private MiniAccumuloClusterImpl mac; + + @Before + public void startMac() throws Exception { + MiniClusterHarness harness = new MiniClusterHarness(); + mac = harness.create(this, new PasswordToken("unused"), kdc, new MiniClusterConfigurationCallback() { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + Map<String,String> site = cfg.getSiteConfig(); - site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); ++ site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + cfg.setSiteConfig(site); + } + + }); + + mac.getConfig().setNumTservers(1); + mac.start(); + // Enabled kerberos auth + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + @After + public void stopMac() throws Exception { + if (null != mac) { + mac.stop(); + } + } + + @Test + public void testAdminUser() throws Exception { + // Login as the client (provided to `accumulo init` as the "root" user) + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + final Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + + // The "root" user should have all system permissions + for (SystemPermission perm : SystemPermission.values()) { + assertTrue("Expected user to have permission: " + perm, conn.securityOperations().hasSystemPermission(conn.whoami(), perm)); + } + + // and the ability to modify the root and metadata tables + for (String table : Arrays.asList(RootTable.NAME, MetadataTable.NAME)) { + assertTrue(conn.securityOperations().hasTablePermission(conn.whoami(), table, TablePermission.ALTER_TABLE)); + } + return null; + } + }); + } + + @Test + public void testNewUser() throws Exception { + String newUser = testName.getMethodName(); + final File newUserKeytab = new File(kdc.getKeytabDir(), newUser + ".keytab"); + if (newUserKeytab.exists() && !newUserKeytab.delete()) { + log.warn("Unable to delete {}", newUserKeytab); + } + + // Create a new user + kdc.createPrincipal(newUserKeytab, newUser); + + final String newQualifiedUser = kdc.qualifyUser(newUser); + final HashSet<String> users = Sets.newHashSet(rootUser.getPrincipal()); + + // Login as the "root" user + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + // Make sure the system user doesn't exist -- this will force some RPC to happen server-side + createTableWithDataAndCompact(conn); + + assertEquals(users, conn.securityOperations().listLocalUsers()); + + return null; + } + }); + // Switch to a new user + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(newQualifiedUser, newUserKeytab.getAbsolutePath()); + log.info("Logged in as {}", newQualifiedUser); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Connector conn = mac.getConnector(newQualifiedUser, new KerberosToken()); + log.info("Created connector as {}", newQualifiedUser); + assertEquals(newQualifiedUser, conn.whoami()); + + // The new user should have no system permissions + for (SystemPermission perm : SystemPermission.values()) { + assertFalse(conn.securityOperations().hasSystemPermission(newQualifiedUser, perm)); + } + + users.add(newQualifiedUser); + + // Same users as before, plus the new user we just created + assertEquals(users, conn.securityOperations().listLocalUsers()); + return null; + } + + }); + } + + @Test + public void testUserPrivilegesThroughGrant() throws Exception { + String user1 = testName.getMethodName(); + final File user1Keytab = new File(kdc.getKeytabDir(), user1 + ".keytab"); + if (user1Keytab.exists() && !user1Keytab.delete()) { + log.warn("Unable to delete {}", user1Keytab); + } + + // Create some new users + kdc.createPrincipal(user1Keytab, user1); + + final String qualifiedUser1 = kdc.qualifyUser(user1); + + // Log in as user1 + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(user1, user1Keytab.getAbsolutePath()); + log.info("Logged in as {}", user1); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Indirectly creates this user when we use it + Connector conn = mac.getConnector(qualifiedUser1, new KerberosToken()); + log.info("Created connector as {}", qualifiedUser1); + + // The new user should have no system permissions + for (SystemPermission perm : SystemPermission.values()) { + assertFalse(conn.securityOperations().hasSystemPermission(qualifiedUser1, perm)); + } + + return null; + } + }); + + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + conn.securityOperations().grantSystemPermission(qualifiedUser1, SystemPermission.CREATE_TABLE); + return null; + } + }); + + // Switch back to the original user + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(user1, user1Keytab.getAbsolutePath()); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Connector conn = mac.getConnector(qualifiedUser1, new KerberosToken()); + + // Shouldn't throw an exception since we granted the create table permission + final String table = testName.getMethodName() + "_user_table"; + conn.tableOperations().create(table); + + // Make sure we can actually use the table we made + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", "d"); + bw.addMutation(m); + bw.close(); + + conn.tableOperations().compact(table, new CompactionConfig().setWait(true).setFlush(true)); + return null; + } + }); + } + + @Test + public void testUserPrivilegesForTable() throws Exception { + String user1 = testName.getMethodName(); + final File user1Keytab = new File(kdc.getKeytabDir(), user1 + ".keytab"); + if (user1Keytab.exists() && !user1Keytab.delete()) { + log.warn("Unable to delete {}", user1Keytab); + } + + // Create some new users -- cannot contain realm + kdc.createPrincipal(user1Keytab, user1); + + final String qualifiedUser1 = kdc.qualifyUser(user1); + + // Log in as user1 + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(qualifiedUser1, user1Keytab.getAbsolutePath()); + log.info("Logged in as {}", user1); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Indirectly creates this user when we use it + Connector conn = mac.getConnector(qualifiedUser1, new KerberosToken()); + log.info("Created connector as {}", qualifiedUser1); + + // The new user should have no system permissions + for (SystemPermission perm : SystemPermission.values()) { + assertFalse(conn.securityOperations().hasSystemPermission(qualifiedUser1, perm)); + } + return null; + } + + }); + + final String table = testName.getMethodName() + "_user_table"; + final String viz = "viz"; + + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + conn.tableOperations().create(table); + // Give our unprivileged user permission on the table we made for them + conn.securityOperations().grantTablePermission(qualifiedUser1, table, TablePermission.READ); + conn.securityOperations().grantTablePermission(qualifiedUser1, table, TablePermission.WRITE); + conn.securityOperations().grantTablePermission(qualifiedUser1, table, TablePermission.ALTER_TABLE); + conn.securityOperations().grantTablePermission(qualifiedUser1, table, TablePermission.DROP_TABLE); + conn.securityOperations().changeUserAuthorizations(qualifiedUser1, new Authorizations(viz)); + return null; + } + }); + + // Switch back to the original user + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(qualifiedUser1, user1Keytab.getAbsolutePath()); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Connector conn = mac.getConnector(qualifiedUser1, new KerberosToken()); + + // Make sure we can actually use the table we made + + // Write data + final long ts = 1000l; + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", new ColumnVisibility(viz.getBytes()), ts, "d"); + bw.addMutation(m); + bw.close(); + + // Compact + conn.tableOperations().compact(table, new CompactionConfig().setWait(true).setFlush(true)); + + // Alter + conn.tableOperations().setProperty(table, Property.TABLE_BLOOM_ENABLED.getKey(), "true"); + + // Read (and proper authorizations) + Scanner s = conn.createScanner(table, new Authorizations(viz)); + Iterator<Entry<Key,Value>> iter = s.iterator(); + assertTrue("No results from iterator", iter.hasNext()); + Entry<Key,Value> entry = iter.next(); + assertEquals(new Key("a", "b", "c", viz, ts), entry.getKey()); + assertEquals(new Value("d".getBytes()), entry.getValue()); + assertFalse("Had more results from iterator", iter.hasNext()); + return null; + } + }); + } + + @Test + public void testDelegationToken() throws Exception { + final String tableName = getUniqueNames(1)[0]; + + // Login as the "root" user + UserGroupInformation root = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + final int numRows = 100, numColumns = 10; + + // As the "root" user, open up the connection and get a delegation token + final AuthenticationToken delegationToken = root.doAs(new PrivilegedExceptionAction<AuthenticationToken>() { + @Override + public AuthenticationToken run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + conn.tableOperations().create(tableName); + BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + for (int r = 0; r < numRows; r++) { + Mutation m = new Mutation(Integer.toString(r)); + for (int c = 0; c < numColumns; c++) { + String col = Integer.toString(c); + m.put(col, col, col); + } + bw.addMutation(m); + } + bw.close(); + + return conn.securityOperations().getDelegationToken(new DelegationTokenConfig()); + } + }); + + // The above login with keytab doesn't have a way to logout, so make a fake user that won't have krb credentials + UserGroupInformation userWithoutPrivs = UserGroupInformation.createUserForTesting("fake_user", new String[0]); + int recordsSeen = userWithoutPrivs.doAs(new PrivilegedExceptionAction<Integer>() { + @Override + public Integer run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), delegationToken); + + BatchScanner bs = conn.createBatchScanner(tableName, Authorizations.EMPTY, 2); + bs.setRanges(Collections.singleton(new Range())); + int recordsSeen = Iterables.size(bs); + bs.close(); + return recordsSeen; + } + }); + + assertEquals(numRows * numColumns, recordsSeen); + } + + @Test + public void testDelegationTokenAsDifferentUser() throws Exception { + // Login as the "root" user + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + final AuthenticationToken delegationToken; + try { + delegationToken = ugi.doAs(new PrivilegedExceptionAction<AuthenticationToken>() { + @Override + public AuthenticationToken run() throws Exception { + // As the "root" user, open up the connection and get a delegation token + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + return conn.securityOperations().getDelegationToken(new DelegationTokenConfig()); + } + }); + } catch (UndeclaredThrowableException ex) { + throw ex; + } + + // make a fake user that won't have krb credentials + UserGroupInformation userWithoutPrivs = UserGroupInformation.createUserForTesting("fake_user", new String[0]); + try { + // Use the delegation token to try to log in as a different user + userWithoutPrivs.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + mac.getConnector("some_other_user", delegationToken); + return null; + } + }); + fail("Using a delegation token as a different user should throw an exception"); + } catch (UndeclaredThrowableException e) { + Throwable cause = e.getCause(); + assertNotNull(cause); + // We should get an AccumuloSecurityException from trying to use a delegation token for the wrong user + assertTrue("Expected cause to be AccumuloSecurityException, but was " + cause.getClass(), cause instanceof AccumuloSecurityException); + } + } + + @Test + public void testGetDelegationTokenDenied() throws Exception { + String newUser = testName.getMethodName(); + final File newUserKeytab = new File(kdc.getKeytabDir(), newUser + ".keytab"); + if (newUserKeytab.exists() && !newUserKeytab.delete()) { + log.warn("Unable to delete {}", newUserKeytab); + } + + // Create a new user + kdc.createPrincipal(newUserKeytab, newUser); + + final String qualifiedNewUser = kdc.qualifyUser(newUser); + + // Login as a normal user + UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(qualifiedNewUser, newUserKeytab.getAbsolutePath()); + try { + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // As the "root" user, open up the connection and get a delegation token + Connector conn = mac.getConnector(qualifiedNewUser, new KerberosToken()); + log.info("Created connector as {}", qualifiedNewUser); + assertEquals(qualifiedNewUser, conn.whoami()); + + conn.securityOperations().getDelegationToken(new DelegationTokenConfig()); + return null; + } + }); + } catch (UndeclaredThrowableException ex) { + assertTrue(ex.getCause() instanceof AccumuloSecurityException); + } + } + + @Test + public void testRestartedMasterReusesSecretKey() throws Exception { + // Login as the "root" user + UserGroupInformation root = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + // As the "root" user, open up the connection and get a delegation token + final AuthenticationToken delegationToken1 = root.doAs(new PrivilegedExceptionAction<AuthenticationToken>() { + @Override + public AuthenticationToken run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + AuthenticationToken token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig()); + + assertTrue("Could not get tables with delegation token", mac.getConnector(rootUser.getPrincipal(), token).tableOperations().list().size() > 0); + + return token; + } + }); + + log.info("Stopping master"); + mac.getClusterControl().stop(ServerType.MASTER); + Thread.sleep(5000); + log.info("Restarting master"); + mac.getClusterControl().start(ServerType.MASTER); + + // Make sure our original token is still good + root.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), delegationToken1); + + assertTrue("Could not get tables with delegation token", conn.tableOperations().list().size() > 0); + + return null; + } + }); + + // Get a new token, so we can compare the keyId on the second to the first + final AuthenticationToken delegationToken2 = root.doAs(new PrivilegedExceptionAction<AuthenticationToken>() { + @Override + public AuthenticationToken run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + AuthenticationToken token = conn.securityOperations().getDelegationToken(new DelegationTokenConfig()); + + assertTrue("Could not get tables with delegation token", mac.getConnector(rootUser.getPrincipal(), token).tableOperations().list().size() > 0); + + return token; + } + }); + + // A restarted master should reuse the same secret key after a restart if the secret key hasn't expired (1day by default) + DelegationTokenImpl dt1 = (DelegationTokenImpl) delegationToken1; + DelegationTokenImpl dt2 = (DelegationTokenImpl) delegationToken2; + assertEquals(dt1.getIdentifier().getKeyId(), dt2.getIdentifier().getKeyId()); + } + + @Test(expected = AccumuloException.class) + public void testDelegationTokenWithInvalidLifetime() throws Throwable { + // Login as the "root" user + UserGroupInformation root = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + // As the "root" user, open up the connection and get a delegation token + try { + root.doAs(new PrivilegedExceptionAction<AuthenticationToken>() { + @Override + public AuthenticationToken run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + // Should fail + return conn.securityOperations().getDelegationToken(new DelegationTokenConfig().setTokenLifetime(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + } + }); + } catch (UndeclaredThrowableException e) { + Throwable cause = e.getCause(); + if (null != cause) { + throw cause; + } else { + throw e; + } + } + } + + @Test + public void testDelegationTokenWithReducedLifetime() throws Throwable { + // Login as the "root" user + UserGroupInformation root = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + // As the "root" user, open up the connection and get a delegation token + final AuthenticationToken dt = root.doAs(new PrivilegedExceptionAction<AuthenticationToken>() { + @Override + public AuthenticationToken run() throws Exception { + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + return conn.securityOperations().getDelegationToken(new DelegationTokenConfig().setTokenLifetime(5, TimeUnit.MINUTES)); + } + }); + + AuthenticationTokenIdentifier identifier = ((DelegationTokenImpl) dt).getIdentifier(); + assertTrue("Expected identifier to expire in no more than 5 minutes: " + identifier, + identifier.getExpirationDate() - identifier.getIssueDate() <= (5 * 60 * 1000)); + } + + /** + * Creates a table, adds a record to it, and then compacts the table. A simple way to make sure that the system user exists (since the master does an RPC to + * the tserver which will create the system user if it doesn't already exist). + */ + private void createTableWithDataAndCompact(Connector conn) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, TableExistsException { + final String table = testName.getMethodName() + "_table"; + conn.tableOperations().create(table); + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", "d"); + bw.addMutation(m); + bw.close(); + conn.tableOperations().compact(table, new CompactionConfig().setFlush(true).setWait(true)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java index cf55683,0000000..142a8bb mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java @@@ -1,188 -1,0 +1,188 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloITBase; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.accumulo.harness.TestingKdc; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; + +/** + * MAC test which uses {@link MiniKdc} to simulate ta secure environment. Can be used as a sanity check for Kerberos/SASL testing. + */ +public class KerberosRenewalIT extends AccumuloITBase { + private static final Logger log = LoggerFactory.getLogger(KerberosRenewalIT.class); + + private static TestingKdc kdc; + private static String krbEnabledForITs = null; + private static ClusterUser rootUser; + + private static final long TICKET_LIFETIME = 6 * 60 * 1000; // Anything less seems to fail when generating the ticket + private static final long TICKET_TEST_LIFETIME = 8 * 60 * 1000; // Run a test for 8 mins + private static final long TEST_DURATION = 9 * 60 * 1000; // The test should finish within 9 mins + + @BeforeClass + public static void startKdc() throws Exception { + // 30s renewal time window + kdc = new TestingKdc(TestingKdc.computeKdcDir(), TestingKdc.computeKeytabDir(), TICKET_LIFETIME); + kdc.start(); + krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION); + if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true"); + } + rootUser = kdc.getRootUser(); + } + + @AfterClass + public static void stopKdc() throws Exception { + if (null != kdc) { + kdc.stop(); + } + if (null != krbEnabledForITs) { + System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs); + } + } + + @Override + public int defaultTimeoutSeconds() { + return (int) TEST_DURATION / 1000; + } + + private MiniAccumuloClusterImpl mac; + + @Before + public void startMac() throws Exception { + MiniClusterHarness harness = new MiniClusterHarness(); + mac = harness.create(this, new PasswordToken("unused"), kdc, new MiniClusterConfigurationCallback() { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + Map<String,String> site = cfg.getSiteConfig(); - site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); ++ site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + // Reduce the period just to make sure we trigger renewal fast + site.put(Property.GENERAL_KERBEROS_RENEWAL_PERIOD.getKey(), "5s"); + cfg.setSiteConfig(site); + } + + }); + + mac.getConfig().setNumTservers(1); + mac.start(); + // Enabled kerberos auth + Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + } + + @After + public void stopMac() throws Exception { + if (null != mac) { + mac.stop(); + } + } + + // Intentially setting the Test annotation timeout. We do not want to scale the timeout. + @Test(timeout = TEST_DURATION) + public void testReadAndWriteThroughTicketLifetime() throws Exception { + // Attempt to use Accumulo for a duration of time that exceeds the Kerberos ticket lifetime. + // This is a functional test to verify that Accumulo services renew their ticket. + // If the test doesn't finish on its own, this signifies that Accumulo services failed + // and the test should fail. If Accumulo services renew their ticket, the test case + // should exit gracefully on its own. + + // Login as the "root" user + UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath()); + log.info("Logged in as {}", rootUser.getPrincipal()); + + Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken()); + log.info("Created connector as {}", rootUser.getPrincipal()); + assertEquals(rootUser.getPrincipal(), conn.whoami()); + + long duration = 0; + long last = System.currentTimeMillis(); + // Make sure we have a couple renewals happen + while (duration < TICKET_TEST_LIFETIME) { + // Create a table, write a record, compact, read the record, drop the table. + createReadWriteDrop(conn); + // Wait a bit after + Thread.sleep(5000); + + // Update the duration + long now = System.currentTimeMillis(); + duration += now - last; + last = now; + } + } + + /** + * Creates a table, adds a record to it, and then compacts the table. A simple way to make sure that the system user exists (since the master does an RPC to + * the tserver which will create the system user if it doesn't already exist). + */ + private void createReadWriteDrop(Connector conn) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, TableExistsException { + final String table = testName.getMethodName() + "_table"; + conn.tableOperations().create(table); + BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); + Mutation m = new Mutation("a"); + m.put("b", "c", "d"); + bw.addMutation(m); + bw.close(); + conn.tableOperations().compact(table, new CompactionConfig().setFlush(true).setWait(true)); + Scanner s = conn.createScanner(table, Authorizations.EMPTY); + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + assertEquals("Did not find the expected key", 0, new Key("a", "b", "c").compareTo(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL)); + assertEquals("d", entry.getValue().toString()); + conn.tableOperations().delete(table); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java index 3489c26,0000000..8ac67d9 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MasterFailoverIT.java @@@ -1,80 -1,0 +1,80 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.Map; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class MasterFailoverIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> siteConfig = cfg.getSiteConfig(); - siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s"); ++ siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + cfg.setSiteConfig(siteConfig); + } + + @Override + protected int defaultTimeoutSeconds() { + return 90; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + String[] names = getUniqueNames(2); + c.tableOperations().create(names[0]); + TestIngest.Opts opts = new TestIngest.Opts(); + opts.setTableName(names[0]); + ClientConfiguration clientConf = cluster.getClientConfig(); + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConf); + } else { + opts.setPrincipal(getAdminPrincipal()); + } + TestIngest.ingest(c, opts, new BatchWriterOpts()); + + ClusterControl control = cluster.getClusterControl(); + control.stopAllServers(ServerType.MASTER); + // start up a new one + control.startAllServers(ServerType.MASTER); + // talk to it + c.tableOperations().rename(names[0], names[1]); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + vopts.setTableName(names[1]); + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + vopts.updateKerberosCredentials(clientConf); + } else { + vopts.setPrincipal(getAdminPrincipal()); + } + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + } +}
