http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/VolumeIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/VolumeIT.java index a338d5f,0000000..d5c940d mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java +++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java @@@ -1,553 -1,0 +1,553 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + ++import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.DiskUsage; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.util.CachedConfiguration; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.init.Initialize; +import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.ZooKeeper; +import org.junit.Assert; +import org.junit.Test; + +public class VolumeIT extends ConfigurableMacIT { + + private static final Text EMPTY = new Text(); + private static final Value EMPTY_VALUE = new Value(new byte[] {}); + private File volDirBase; + private Path v1, v2; + + @Override + protected int defaultTimeoutSeconds() { + return 5 * 60; + } + + @SuppressWarnings("deprecation") + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + File baseDir = cfg.getDir(); + volDirBase = new File(baseDir, "volumes"); + File v1f = new File(volDirBase, "v1"); + File v2f = new File(volDirBase, "v2"); + v1f.mkdir(); + v2f.mkdir(); + v1 = new Path("file://" + v1f.getAbsolutePath()); + v2 = new Path("file://" + v2f.getAbsolutePath()); + + // Run MAC on two locations in the local file system + URI v1Uri = v1.toUri(); + cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath()); + cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost()); + cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString()); + + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + + super.configure(cfg, hadoopCoreSite); + } + + @Test + public void test() throws Exception { + // create a table + Connector connector = getConnector(); + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName); + SortedSet<Text> partitions = new TreeSet<Text>(); + // with some splits + for (String s : "d,m,t".split(",")) + partitions.add(new Text(s)); + connector.tableOperations().addSplits(tableName, partitions); + // scribble over the splits + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(","); + for (String s : rows) { + Mutation m = new Mutation(new Text(s)); + m.put(EMPTY, EMPTY, EMPTY_VALUE); + bw.addMutation(m); + } + bw.close(); + // write the data to disk, read it back + connector.tableOperations().flush(tableName, null, null, true); + Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY); + int i = 0; + for (Entry<Key,Value> entry : scanner) { + assertEquals(rows[i++], entry.getKey().getRow().toString()); + } + // verify the new files are written to the different volumes + scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(new Range("1", "1<")); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + int fileCount = 0; + + for (Entry<Key,Value> entry : scanner) { + boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString()); + boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString()); + assertTrue(inV1 || inV2); + fileCount++; + } + assertEquals(4, fileCount); + List<DiskUsage> diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName)); + assertEquals(1, diskUsage.size()); + long usage = diskUsage.get(0).getUsage().longValue(); + System.out.println("usage " + usage); + assertTrue(usage > 700 && usage < 800); + } + + private void verifyData(List<String> expected, Scanner createScanner) { + + List<String> actual = new ArrayList<String>(); + + for (Entry<Key,Value> entry : createScanner) { + Key k = entry.getKey(); + actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":" + entry.getValue()); + } + + Collections.sort(expected); + Collections.sort(actual); + + Assert.assertEquals(expected, actual); + } + + @Test + public void testRelativePaths() throws Exception { + + List<String> expected = new ArrayList<String>(); + + Connector connector = getConnector(); + String tableName = getUniqueNames(1)[0]; + connector.tableOperations().create(tableName, false); + + String tableId = connector.tableOperations().tableIdMap().get(tableName); + + SortedSet<Text> partitions = new TreeSet<Text>(); + // with some splits + for (String s : "c,g,k,p,s,v".split(",")) + partitions.add(new Text(s)); + + connector.tableOperations().addSplits(tableName, partitions); + + BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig()); + + // create two files in each tablet + + String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(","); + for (String s : rows) { + Mutation m = new Mutation(s); + m.put("cf1", "cq1", "1"); + bw.addMutation(m); + expected.add(s + ":cf1:cq1:1"); + } + + bw.flush(); + connector.tableOperations().flush(tableName, null, null, true); + + for (String s : rows) { + Mutation m = new Mutation(s); + m.put("cf1", "cq1", "2"); + bw.addMutation(m); + expected.add(s + ":cf1:cq1:2"); + } + + bw.close(); + connector.tableOperations().flush(tableName, null, null, true); + + verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY)); + + connector.tableOperations().offline(tableName, true); + + connector.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE); + + Scanner metaScanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); + + BatchWriter mbw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + + for (Entry<Key,Value> entry : metaScanner) { + String cq = entry.getKey().getColumnQualifier().toString(); + if (cq.startsWith(v1.toString())) { + Path path = new Path(cq); + String relPath = "/" + path.getParent().getName() + "/" + path.getName(); + Mutation fileMut = new Mutation(entry.getKey().getRow()); + fileMut.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); + fileMut.put(entry.getKey().getColumnFamily().toString(), relPath, entry.getValue().toString()); + mbw.addMutation(fileMut); + } + } + + mbw.close(); + + connector.tableOperations().online(tableName, true); + + verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY)); + + connector.tableOperations().compact(tableName, null, null, true, true); + + verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY)); + + for (Entry<Key,Value> entry : metaScanner) { + String cq = entry.getKey().getColumnQualifier().toString(); + Path path = new Path(cq); + Assert.assertTrue("relative path not deleted " + path.toString(), path.depth() > 2); + } + + } + + @Test + public void testAddVolumes() throws Exception { + + String[] tableNames = getUniqueNames(2); + + // grab this before shutting down cluster + String uuid = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()).getInstanceID(); + + verifyVolumesUsed(tableNames[0], false, v1, v2); + + Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor()); + cluster.stop(); + + Configuration conf = new Configuration(false); + conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml")); + + File v3f = new File(volDirBase, "v3"); + v3f.mkdir(); + Path v3 = new Path("file://" + v3f.getAbsolutePath()); + + conf.set(Property.INSTANCE_VOLUMES.getKey(), v1.toString() + "," + v2.toString() + "," + v3.toString()); + BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml"))); + conf.writeXml(fos); + fos.close(); + + // initialize volume + Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor()); + + // check that all volumes are initialized + for (Path volumePath : Arrays.asList(v1, v2, v3)) { + FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance()); + Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR); + FileStatus[] iids = fs.listStatus(vp); + Assert.assertEquals(1, iids.length); + Assert.assertEquals(uuid, iids[0].getPath().getName()); + } + + // start cluster and verify that new volume is used + cluster.start(); + + verifyVolumesUsed(tableNames[1], false, v1, v2, v3); + } + + @Test + public void testNonConfiguredVolumes() throws Exception { + + String[] tableNames = getUniqueNames(2); + + // grab this before shutting down cluster + String uuid = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()).getInstanceID(); + + verifyVolumesUsed(tableNames[0], false, v1, v2); + + Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor()); + cluster.stop(); + + Configuration conf = new Configuration(false); + conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml")); + + File v3f = new File(volDirBase, "v3"); + v3f.mkdir(); + Path v3 = new Path("file://" + v3f.getAbsolutePath()); + + conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString() + "," + v3.toString()); + BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml"))); + conf.writeXml(fos); + fos.close(); + + // initialize volume + Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor()); + + // check that all volumes are initialized + for (Path volumePath : Arrays.asList(v1, v2, v3)) { + FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance()); + Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR); + FileStatus[] iids = fs.listStatus(vp); + Assert.assertEquals(1, iids.length); + Assert.assertEquals(uuid, iids[0].getPath().getName()); + } + + // start cluster and verify that new volume is used + cluster.start(); + + // Make sure we can still read the tables (tableNames[0] is very likely to have a file still on v1) + List<String> expected = new ArrayList<String>(); + for (int i = 0; i < 100; i++) { + String row = String.format("%06d", i * 100 + 3); + expected.add(row + ":cf1:cq1:1"); + } + + verifyData(expected, getConnector().createScanner(tableNames[0], Authorizations.EMPTY)); + + // v1 should not have any data for tableNames[1] + verifyVolumesUsed(tableNames[1], false, v2, v3); + } + + private void writeData(String tableName, Connector conn) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, + MutationsRejectedException { + TreeSet<Text> splits = new TreeSet<Text>(); + for (int i = 1; i < 100; i++) { + splits.add(new Text(String.format("%06d", i * 100))); + } + + conn.tableOperations().create(tableName); + conn.tableOperations().addSplits(tableName, splits); + + BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig()); + for (int i = 0; i < 100; i++) { + String row = String.format("%06d", i * 100 + 3); + Mutation m = new Mutation(row); + m.put("cf1", "cq1", "1"); + bw.addMutation(m); + } + + bw.close(); + } + + private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws AccumuloException, AccumuloSecurityException, + TableExistsException, TableNotFoundException, MutationsRejectedException { + + Connector conn = getConnector(); + + List<String> expected = new ArrayList<String>(); + for (int i = 0; i < 100; i++) { + String row = String.format("%06d", i * 100 + 3); + expected.add(row + ":cf1:cq1:1"); + } + + if (!conn.tableOperations().exists(tableName)) { + Assert.assertFalse(shouldExist); + + writeData(tableName, conn); + + verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY)); + + conn.tableOperations().flush(tableName, null, null, true); + } + + verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY)); + + String tableId = conn.tableOperations().tableIdMap().get(tableName); + Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(metaScanner); + metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME); + metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); + + int counts[] = new int[paths.length]; + + outer: for (Entry<Key,Value> entry : metaScanner) { + String cf = entry.getKey().getColumnFamily().toString(); + String cq = entry.getKey().getColumnQualifier().toString(); + + String path; + if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME.toString())) + path = cq; + else + path = entry.getValue().toString(); + + for (int i = 0; i < paths.length; i++) { + if (path.startsWith(paths[i].toString())) { + counts[i]++; + continue outer; + } + } + + Assert.fail("Unexpected volume " + path); + } + + // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes - + // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18 + + int sum = 0; + for (int count : counts) { + Assert.assertTrue(count > 0); + sum += count; + } + + Assert.assertEquals(200, sum); + } + + @Test + public void testRemoveVolumes() throws Exception { + String[] tableNames = getUniqueNames(2); + + verifyVolumesUsed(tableNames[0], false, v1, v2); + + Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor()); + cluster.stop(); + + Configuration conf = new Configuration(false); + conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml")); + + conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString()); + BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml"))); + conf.writeXml(fos); + fos.close(); + + // start cluster and verify that volume was decommisioned + cluster.start(); + + Connector conn = cluster.getConnector("root", ROOT_PASSWORD); + conn.tableOperations().compact(tableNames[0], null, null, true, true); + + verifyVolumesUsed(tableNames[0], true, v2); + + // check that root tablet is not on volume 1 + String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers())) + RootTable.ZROOT_TABLET_PATH; + ZooKeeper zookeeper = new ZooKeeper(cluster.getZooKeepers(), 30000, null); - String rootTabletDir = new String(zookeeper.getData(zpath, false, null), Constants.UTF8); ++ String rootTabletDir = new String(zookeeper.getData(zpath, false, null), UTF_8); + Assert.assertTrue(rootTabletDir.startsWith(v2.toString())); + zookeeper.close(); + + conn.tableOperations().clone(tableNames[0], tableNames[1], true, new HashMap<String,String>(), new HashSet<String>()); + + conn.tableOperations().flush(MetadataTable.NAME, null, null, true); + conn.tableOperations().flush(RootTable.NAME, null, null, true); + + verifyVolumesUsed(tableNames[0], true, v2); + verifyVolumesUsed(tableNames[1], true, v2); + + } + + private void testReplaceVolume(boolean cleanShutdown) throws Exception { + String[] tableNames = getUniqueNames(3); + + verifyVolumesUsed(tableNames[0], false, v1, v2); + + // write to 2nd table, but do not flush data to disk before shutdown + writeData(tableNames[1], cluster.getConnector("root", ROOT_PASSWORD)); + + if (cleanShutdown) + Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor()); + + cluster.stop(); + + File v1f = new File(v1.toUri()); + File v8f = new File(new File(v1.getParent().toUri()), "v8"); + Assert.assertTrue("Failed to rename " + v1f + " to " + v8f, v1f.renameTo(v8f)); + Path v8 = new Path(v8f.toURI()); + + File v2f = new File(v2.toUri()); + File v9f = new File(new File(v2.getParent().toUri()), "v9"); + Assert.assertTrue("Failed to rename " + v2f + " to " + v9f, v2f.renameTo(v9f)); + Path v9 = new Path(v9f.toURI()); + + Configuration conf = new Configuration(false); + conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml")); + + conf.set(Property.INSTANCE_VOLUMES.getKey(), v8 + "," + v9); + conf.set(Property.INSTANCE_VOLUMES_REPLACEMENTS.getKey(), v1 + " " + v8 + "," + v2 + " " + v9); + BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml"))); + conf.writeXml(fos); + fos.close(); + + // start cluster and verify that volumes were replaced + cluster.start(); + + verifyVolumesUsed(tableNames[0], true, v8, v9); + verifyVolumesUsed(tableNames[1], true, v8, v9); + + // verify writes to new dir + getConnector().tableOperations().compact(tableNames[0], null, null, true, true); + getConnector().tableOperations().compact(tableNames[1], null, null, true, true); + + verifyVolumesUsed(tableNames[0], true, v8, v9); + verifyVolumesUsed(tableNames[1], true, v8, v9); + + // check that root tablet is not on volume 1 or 2 + String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers())) + RootTable.ZROOT_TABLET_PATH; + ZooKeeper zookeeper = new ZooKeeper(cluster.getZooKeepers(), 30000, null); - String rootTabletDir = new String(zookeeper.getData(zpath, false, null), Constants.UTF8); ++ String rootTabletDir = new String(zookeeper.getData(zpath, false, null), UTF_8); + Assert.assertTrue(rootTabletDir.startsWith(v8.toString()) || rootTabletDir.startsWith(v9.toString())); + zookeeper.close(); + + getConnector().tableOperations().clone(tableNames[1], tableNames[2], true, new HashMap<String,String>(), new HashSet<String>()); + + getConnector().tableOperations().flush(MetadataTable.NAME, null, null, true); + getConnector().tableOperations().flush(RootTable.NAME, null, null, true); + + verifyVolumesUsed(tableNames[0], true, v8, v9); + verifyVolumesUsed(tableNames[1], true, v8, v9); + verifyVolumesUsed(tableNames[2], true, v8, v9); + } + + @Test + public void testCleanReplaceVolumes() throws Exception { + testReplaceVolume(true); + } + + @Test + public void testDirtyReplaceVolumes() throws Exception { + testReplaceVolume(false); + } +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java index f657034,0000000..05de342 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/AddSplitIT.java @@@ -1,140 -1,0 +1,141 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.Collection; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeSet; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class AddSplitIT extends SimpleMacIT { + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void addSplitTest() throws Exception { + + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + c.tableOperations().create(tableName); + + insertData(tableName, 1l); + + TreeSet<Text> splits = new TreeSet<Text>(); + splits.add(new Text(String.format("%09d", 333))); + splits.add(new Text(String.format("%09d", 666))); + + c.tableOperations().addSplits(tableName, splits); + + UtilWaitThread.sleep(100); + + Collection<Text> actualSplits = c.tableOperations().listSplits(tableName); + + if (!splits.equals(new TreeSet<Text>(actualSplits))) { + throw new Exception(splits + " != " + actualSplits); + } + + verifyData(tableName, 1l); + insertData(tableName, 2l); + + // did not clear splits on purpose, it should ignore existing split points + // and still create the three additional split points + + splits.add(new Text(String.format("%09d", 200))); + splits.add(new Text(String.format("%09d", 500))); + splits.add(new Text(String.format("%09d", 800))); + + c.tableOperations().addSplits(tableName, splits); + + UtilWaitThread.sleep(100); + + actualSplits = c.tableOperations().listSplits(tableName); + + if (!splits.equals(new TreeSet<Text>(actualSplits))) { + throw new Exception(splits + " != " + actualSplits); + } + + verifyData(tableName, 2l); + } + + private void verifyData(String tableName, long ts) throws Exception { + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + for (int i = 0; i < 10000; i++) { + if (!iter.hasNext()) { + throw new Exception("row " + i + " not found"); + } + + Entry<Key,Value> entry = iter.next(); + + String row = String.format("%09d", i); + + if (!entry.getKey().getRow().equals(new Text(row))) { + throw new Exception("unexpected row " + entry.getKey() + " " + i); + } + + if (entry.getKey().getTimestamp() != ts) { + throw new Exception("unexpected ts " + entry.getKey() + " " + ts); + } + + if (Integer.parseInt(entry.getValue().toString()) != i) { + throw new Exception("unexpected value " + entry + " " + i); + } + } + + if (iter.hasNext()) { + throw new Exception("found more than expected " + iter.next()); + } + + } + + private void insertData(String tableName, long ts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException { + BatchWriter bw = getConnector().createBatchWriter(tableName, null); + + for (int i = 0; i < 10000; i++) { + String row = String.format("%09d", i); + + Mutation m = new Mutation(new Text(row)); - m.put(new Text("cf1"), new Text("cq1"), ts, new Value(Integer.toString(i).getBytes(Constants.UTF8))); ++ m.put(new Text("cf1"), new Text("cq1"), ts, new Value(Integer.toString(i).getBytes(UTF_8))); + bw.addMutation(m); + } + + bw.close(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java index 0139a10,0000000..9c4492e mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BadIteratorMincIT.java @@@ -1,110 -1,0 +1,110 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertEquals; + +import java.util.EnumSet; +import java.util.Map.Entry; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class BadIteratorMincIT extends SimpleMacIT { + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + IteratorSetting is = new IteratorSetting(30, BadIterator.class); + c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc)); + BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + + Mutation m = new Mutation(new Text("r1")); - m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(Constants.UTF8))); ++ m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8))); + + bw.addMutation(m); + bw.close(); + + c.tableOperations().flush(tableName, null, null, false); + UtilWaitThread.sleep(1000); + + // minc should fail, so there should be no files + FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 0, 0); + + // try to scan table + Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY); + int count = FunctionalTestUtils.count(scanner); + assertEquals("Did not see expected # entries " + count, 1, count); + + // remove the bad iterator + c.tableOperations().removeIterator(tableName, BadIterator.class.getSimpleName(), EnumSet.of(IteratorScope.minc)); + + UtilWaitThread.sleep(5000); + + // minc should complete + FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 1, 1); + + count = 0; + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : scanner) { + count++; + } + + if (count != 1) + throw new Exception("Did not see expected # entries " + count); + + // now try putting bad iterator back and deleting the table + c.tableOperations().attachIterator(tableName, is, EnumSet.of(IteratorScope.minc)); + bw = c.createBatchWriter(tableName, new BatchWriterConfig()); + m = new Mutation(new Text("r2")); - m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(Constants.UTF8))); ++ m.put(new Text("acf"), new Text(tableName), new Value("1".getBytes(UTF_8))); + bw.addMutation(m); + bw.close(); + + // make sure property is given time to propagate + UtilWaitThread.sleep(500); + + c.tableOperations().flush(tableName, null, null, false); + + // make sure the flush has time to start + UtilWaitThread.sleep(1000); + + // this should not hang + c.tableOperations().delete(tableName); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java index c67407d,0000000..688a326 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BatchScanSplitIT.java @@@ -1,125 -1,0 +1,126 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.Random; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class BatchScanSplitIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_MAJC_DELAY.getKey(), "0")); + } + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void test() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + + int numRows = 1 << 18; + + BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + + for (int i = 0; i < numRows; i++) { + Mutation m = new Mutation(new Text(String.format("%09x", i))); - m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes(Constants.UTF8))); ++ m.put(new Text("cf1"), new Text("cq1"), new Value(String.format("%016x", numRows - i).getBytes(UTF_8))); + bw.addMutation(m); + } + + bw.close(); + + getConnector().tableOperations().flush(tableName, null, null, true); + + getConnector().tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "4K"); + + Collection<Text> splits = getConnector().tableOperations().listSplits(tableName); + while (splits.size() < 2) { + UtilWaitThread.sleep(1); + splits = getConnector().tableOperations().listSplits(tableName); + } + + System.out.println("splits : " + splits); + + Random random = new Random(19011230); + HashMap<Text,Value> expected = new HashMap<Text,Value>(); + ArrayList<Range> ranges = new ArrayList<Range>(); + for (int i = 0; i < 100; i++) { + int r = random.nextInt(numRows); + Text row = new Text(String.format("%09x", r)); - expected.put(row, new Value(String.format("%016x", numRows - r).getBytes(Constants.UTF8))); ++ expected.put(row, new Value(String.format("%016x", numRows - r).getBytes(UTF_8))); + ranges.add(new Range(row)); + } + + // logger.setLevel(Level.TRACE); + + HashMap<Text,Value> found = new HashMap<Text,Value>(); + + for (int i = 0; i < 20; i++) { + BatchScanner bs = getConnector().createBatchScanner(tableName, Authorizations.EMPTY, 4); + + found.clear(); + + long t1 = System.currentTimeMillis(); + + bs.setRanges(ranges); + + for (Entry<Key,Value> entry : bs) { + found.put(entry.getKey().getRow(), entry.getValue()); + } + bs.close(); + + long t2 = System.currentTimeMillis(); + + log.info(String.format("rate : %06.2f%n", ranges.size() / ((t2 - t1) / 1000.0))); + + if (!found.equals(expected)) + throw new Exception("Found and expected differ " + found + " " + expected); + } + + splits = getConnector().tableOperations().listSplits(tableName); + log.info("splits : " + splits); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java index e2dd903,0000000..465936e mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java @@@ -1,181 -1,0 +1,182 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.TimeUnit; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class BatchWriterFlushIT extends SimpleMacIT { + + private static final int NUM_TO_FLUSH = 100000; + + @Override + protected int defaultTimeoutSeconds() { + return 90; + } + + @Test + public void run() throws Exception { + Connector c = getConnector(); + String[] tableNames = getUniqueNames(2); + String bwft = tableNames[0]; + c.tableOperations().create(bwft); + String bwlt = tableNames[1]; + c.tableOperations().create(bwlt); + runFlushTest(bwft); + runLatencyTest(bwlt); + + } + + private void runLatencyTest(String tableName) throws Exception { + // should automatically flush after 2 seconds + BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS)); + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + + Mutation m = new Mutation(new Text(String.format("r_%10d", 1))); - m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(Constants.UTF8))); ++ m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8))); + bw.addMutation(m); + + UtilWaitThread.sleep(500); + + int count = 0; + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : scanner) { + count++; + } + + if (count != 0) { + throw new Exception("Flushed too soon"); + } + + UtilWaitThread.sleep(1500); + + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : scanner) { + count++; + } + + if (count != 1) { + throw new Exception("Did not flush"); + } + + bw.close(); + } + + private void runFlushTest(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException, + Exception { + BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + Random r = new Random(); + + for (int i = 0; i < 4; i++) { + for (int j = 0; j < NUM_TO_FLUSH; j++) { + int row = i * NUM_TO_FLUSH + j; + + Mutation m = new Mutation(new Text(String.format("r_%10d", row))); + m.put(new Text("cf"), new Text("cq"), new Value(("" + row).getBytes())); + bw.addMutation(m); + } + + bw.flush(); + + // do a few random lookups into the data just flushed + + for (int k = 0; k < 10; k++) { + int rowToLookup = r.nextInt(NUM_TO_FLUSH) + i * NUM_TO_FLUSH; + + scanner.setRange(new Range(new Text(String.format("r_%10d", rowToLookup)))); + + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + if (!iter.hasNext()) + throw new Exception(" row " + rowToLookup + " not found after flush"); + + Entry<Key,Value> entry = iter.next(); + + if (iter.hasNext()) + throw new Exception("Scanner returned too much"); + + verifyEntry(rowToLookup, entry); + } + + // scan all data just flushed + scanner.setRange(new Range(new Text(String.format("r_%10d", i * NUM_TO_FLUSH)), true, new Text(String.format("r_%10d", (i + 1) * NUM_TO_FLUSH)), false)); + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + for (int j = 0; j < NUM_TO_FLUSH; j++) { + int row = i * NUM_TO_FLUSH + j; + + if (!iter.hasNext()) + throw new Exception("Scan stopped permaturely at " + row); + + Entry<Key,Value> entry = iter.next(); + + verifyEntry(row, entry); + } + + if (iter.hasNext()) + throw new Exception("Scanner returned too much"); + + } + + bw.close(); + + // test adding a mutation to a closed batch writer + boolean caught = false; + try { + bw.addMutation(new Mutation(new Text("foobar"))); + } catch (IllegalStateException ise) { + caught = true; + } + + if (!caught) { + throw new Exception("Adding to closed batch writer did not fail"); + } + } + + private void verifyEntry(int row, Entry<Key,Value> entry) throws Exception { + if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) { + throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey()); + } + + if (!entry.getValue().toString().equals("" + row)) { + throw new Exception("Unexpected value, expected " + row + " got " + entry.getValue()); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java index 7974c6c,0000000..ffe55bd mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/BulkFileIT.java @@@ -1,120 -1,0 +1,121 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.volume.VolumeConfiguration; +import org.apache.accumulo.server.conf.ServerConfiguration; +import org.apache.accumulo.server.trace.TraceFileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class BulkFileIT extends SimpleMacIT { + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + @Test + public void testBulkFile() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + SortedSet<Text> splits = new TreeSet<Text>(); + for (String split : "0333 0666 0999 1333 1666".split(" ")) + splits.add(new Text(split)); + c.tableOperations().addSplits(tableName, splits); + Configuration conf = new Configuration(); + AccumuloConfiguration aconf = ServerConfiguration.getDefaultConfiguration(); + FileSystem fs = TraceFileSystem.wrap(VolumeConfiguration.getDefaultVolume(conf, aconf).getFileSystem()); + + String dir = rootPath() + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0]; + + fs.delete(new Path(dir), true); + + FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf); + writer1.startDefaultLocalityGroup(); + writeData(writer1, 0, 333); + writer1.close(); + + FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf); + writer2.startDefaultLocalityGroup(); + writeData(writer2, 334, 999); + writer2.close(); + + FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf); + writer3.startDefaultLocalityGroup(); + writeData(writer3, 1000, 1999); + writer3.close(); + + FunctionalTestUtils.bulkImport(c, fs, tableName, dir); + + FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1); + + verifyData(tableName, 0, 1999); + + } + + private void verifyData(String table, int s, int e) throws Exception { + Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY); + + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + for (int i = s; i <= e; i++) { + if (!iter.hasNext()) + throw new Exception("row " + i + " not found"); + + Entry<Key,Value> entry = iter.next(); + + String row = String.format("%04d", i); + + if (!entry.getKey().getRow().equals(new Text(row))) + throw new Exception("unexpected row " + entry.getKey() + " " + i); + + if (Integer.parseInt(entry.getValue().toString()) != i) + throw new Exception("unexpected value " + entry + " " + i); + } + + if (iter.hasNext()) + throw new Exception("found more than expected " + iter.next()); + } + + private void writeData(FileSKVWriter w, int s, int e) throws Exception { + for (int i = s; i <= e; i++) { - w.append(new Key(new Text(String.format("%04d", i))), new Value(Integer.toString(i).getBytes(Constants.UTF8))); ++ w.append(new Key(new Text(String.format("%04d", i))), new Value(Integer.toString(i).getBytes(UTF_8))); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java index 785a997,0000000..92bd714 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConcurrencyIT.java @@@ -1,147 -1,0 +1,148 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.Collections; +import java.util.EnumSet; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class ConcurrencyIT extends ConfigurableMacIT { + + static class ScanTask extends Thread { + + int count = 0; + Scanner scanner; + + ScanTask(Connector conn, long time) throws Exception { + scanner = conn.createScanner("cct", Authorizations.EMPTY); + IteratorSetting slow = new IteratorSetting(30, "slow", SlowIterator.class); + SlowIterator.setSleepTime(slow, time); + scanner.addScanIterator(slow); + } + + @Override + public void run() { + count = FunctionalTestUtils.count(scanner); + } + + } + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_MAJC_DELAY.getKey(), "1")); + } + + @Override + protected int defaultTimeoutSeconds() { + return 2 * 60; + } + + /* + * Below is a diagram of the operations in this test over time. + * + * Scan 0 |------------------------------| Scan 1 |----------| Minc 1 |-----| Scan 2 |----------| Scan 3 |---------------| Minc 2 |-----| Majc 1 |-----| + */ + + @Test + public void run() throws Exception { + Connector c = getConnector(); + runTest(c); + } + + static void runTest(Connector c) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, + MutationsRejectedException, Exception, InterruptedException { + c.tableOperations().create("cct"); + IteratorSetting is = new IteratorSetting(10, SlowIterator.class); + SlowIterator.setSleepTime(is, 50); + c.tableOperations().attachIterator("cct", is, EnumSet.of(IteratorScope.minc, IteratorScope.majc)); + c.tableOperations().setProperty("cct", Property.TABLE_MAJC_RATIO.getKey(), "1.0"); + + BatchWriter bw = c.createBatchWriter("cct", new BatchWriterConfig()); + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(new Text(String.format("%06d", i))); - m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(Constants.UTF8))); ++ m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8))); + bw.addMutation(m); + } + bw.flush(); + + ScanTask st0 = new ScanTask(c, 300); + st0.start(); + + ScanTask st1 = new ScanTask(c, 100); + st1.start(); + + UtilWaitThread.sleep(50); + c.tableOperations().flush("cct", null, null, true); + + for (int i = 0; i < 50; i++) { + Mutation m = new Mutation(new Text(String.format("%06d", i))); - m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(Constants.UTF8))); ++ m.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8))); + bw.addMutation(m); + } + + bw.flush(); + + ScanTask st2 = new ScanTask(c, 100); + st2.start(); + + st1.join(); + st2.join(); + if (st1.count != 50) + throw new Exception("Thread 1 did not see 50, saw " + st1.count); + + if (st2.count != 50) + throw new Exception("Thread 2 did not see 50, saw " + st2.count); + + ScanTask st3 = new ScanTask(c, 150); + st3.start(); + + UtilWaitThread.sleep(50); + c.tableOperations().flush("cct", null, null, false); + + st3.join(); + if (st3.count != 50) + throw new Exception("Thread 3 did not see 50, saw " + st3.count); + + st0.join(); + if (st0.count != 50) + throw new Exception("Thread 0 did not see 50, saw " + st0.count); + + bw.close(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/ConstraintIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/ConstraintIT.java index d5e9188,0000000..7e5944e mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ConstraintIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ConstraintIT.java @@@ -1,317 -1,0 +1,318 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.ConstraintViolationSummary; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint; +import org.apache.accumulo.examples.simple.constraints.NumericValueConstraint; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class ConstraintIT extends SimpleMacIT { + + @Override + protected int defaultTimeoutSeconds() { + return 30; + } + + @Test + public void run() throws Exception { + String[] tableNames = getUniqueNames(3); + Connector c = getConnector(); + for (String table : tableNames) { + c.tableOperations().create(table); + c.tableOperations().addConstraint(table, NumericValueConstraint.class.getName()); + c.tableOperations().addConstraint(table, AlphaNumKeyConstraint.class.getName()); + } + + // Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME); + // logger.setLevel(Level.TRACE); + + test1(tableNames[0]); + + // logger.setLevel(Level.TRACE); + + test2(tableNames[1], false); + test2(tableNames[2], true); + } + + private void test1(String tableName) throws Exception { + BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + + Mutation mut1 = new Mutation(new Text("r1")); - mut1.put(new Text("cf1"), new Text("cq1"), new Value("123".getBytes(Constants.UTF8))); ++ mut1.put(new Text("cf1"), new Text("cq1"), new Value("123".getBytes(UTF_8))); + + bw.addMutation(mut1); + + // should not throw any exceptions + bw.close(); + + bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + + // create a mutation with a non numeric value + Mutation mut2 = new Mutation(new Text("r1")); - mut2.put(new Text("cf1"), new Text("cq1"), new Value("123a".getBytes(Constants.UTF8))); ++ mut2.put(new Text("cf1"), new Text("cq1"), new Value("123a".getBytes(UTF_8))); + + bw.addMutation(mut2); + + boolean sawMRE = false; + + try { + bw.close(); + // should not get here + throw new Exception("Test failed, constraint did not catch bad mutation"); + } catch (MutationsRejectedException mre) { + sawMRE = true; + + // verify constraint violation summary + List<ConstraintViolationSummary> cvsl = mre.getConstraintViolationSummaries(); + + if (cvsl.size() != 1) { + throw new Exception("Unexpected constraints"); + } + + for (ConstraintViolationSummary cvs : cvsl) { + if (!cvs.constrainClass.equals(NumericValueConstraint.class.getName())) { + throw new Exception("Unexpected constraint class " + cvs.constrainClass); + } + + if (cvs.numberOfViolatingMutations != 1) { + throw new Exception("Unexpected # violating mutations " + cvs.numberOfViolatingMutations); + } + } + } + + if (!sawMRE) { + throw new Exception("Did not see MutationsRejectedException"); + } + + // verify mutation did not go through + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(new Range(new Text("r1"))); + + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + Entry<Key,Value> entry = iter.next(); + + if (!entry.getKey().getRow().equals(new Text("r1")) || !entry.getKey().getColumnFamily().equals(new Text("cf1")) - || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123".getBytes(Constants.UTF8)))) { ++ || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123".getBytes(UTF_8)))) { + throw new Exception("Unexpected key or value " + entry.getKey() + " " + entry.getValue()); + } + + if (iter.hasNext()) { + entry = iter.next(); + throw new Exception("Unexpected extra key or value " + entry.getKey() + " " + entry.getValue()); + } + + // remove the numeric value constraint + getConnector().tableOperations().removeConstraint(tableName, 2); + UtilWaitThread.sleep(1000); + + // now should be able to add a non numeric value + bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + bw.addMutation(mut2); + bw.close(); + + // verify mutation went through + iter = scanner.iterator(); + entry = iter.next(); + + if (!entry.getKey().getRow().equals(new Text("r1")) || !entry.getKey().getColumnFamily().equals(new Text("cf1")) - || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123a".getBytes(Constants.UTF8)))) { ++ || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123a".getBytes(UTF_8)))) { + throw new Exception("Unexpected key or value " + entry.getKey() + " " + entry.getValue()); + } + + if (iter.hasNext()) { + entry = iter.next(); + throw new Exception("Unexpected extra key or value " + entry.getKey() + " " + entry.getValue()); + } + + // add a constraint that references a non-existant class + getConnector().tableOperations().setProperty(tableName, Property.TABLE_CONSTRAINT_PREFIX + "1", "com.foobar.nonExistantClass"); + UtilWaitThread.sleep(1000); + + // add a mutation + bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + + Mutation mut3 = new Mutation(new Text("r1")); - mut3.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(Constants.UTF8))); ++ mut3.put(new Text("cf1"), new Text("cq1"), new Value("foo".getBytes(UTF_8))); + + bw.addMutation(mut3); + + sawMRE = false; + + try { + bw.close(); + // should not get here + throw new Exception("Test failed, mutation went through when table had bad constraints"); + } catch (MutationsRejectedException mre) { + sawMRE = true; + } + + if (!sawMRE) { + throw new Exception("Did not see MutationsRejectedException"); + } + + // verify the mutation did not go through + iter = scanner.iterator(); + entry = iter.next(); + + if (!entry.getKey().getRow().equals(new Text("r1")) || !entry.getKey().getColumnFamily().equals(new Text("cf1")) - || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123a".getBytes(Constants.UTF8)))) { ++ || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123a".getBytes(UTF_8)))) { + throw new Exception("Unexpected key or value " + entry.getKey() + " " + entry.getValue()); + } + + if (iter.hasNext()) { + entry = iter.next(); + throw new Exception("Unexpected extra key or value " + entry.getKey() + " " + entry.getValue()); + } + + // remove the bad constraint + getConnector().tableOperations().removeConstraint(tableName, 1); + UtilWaitThread.sleep(1000); + + // try the mutation again + bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + bw.addMutation(mut3); + bw.close(); + + // verify it went through + iter = scanner.iterator(); + entry = iter.next(); + + if (!entry.getKey().getRow().equals(new Text("r1")) || !entry.getKey().getColumnFamily().equals(new Text("cf1")) - || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("foo".getBytes(Constants.UTF8)))) { ++ || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("foo".getBytes(UTF_8)))) { + throw new Exception("Unexpected key or value " + entry.getKey() + " " + entry.getValue()); + } + + if (iter.hasNext()) { + entry = iter.next(); + throw new Exception("Unexpected extra key or value " + entry.getKey() + " " + entry.getValue()); + } + } + + private Mutation newMut(String row, String cf, String cq, String val) { + Mutation mut1 = new Mutation(new Text(row)); - mut1.put(new Text(cf), new Text(cq), new Value(val.getBytes(Constants.UTF8))); ++ mut1.put(new Text(cf), new Text(cq), new Value(val.getBytes(UTF_8))); + return mut1; + } + + private void test2(String table, boolean doFlush) throws Exception { + // test sending multiple mutations with multiple constrain violations... all of the non violating mutations + // should go through + int numericErrors = 2; + + BatchWriter bw = getConnector().createBatchWriter(table, new BatchWriterConfig()); + bw.addMutation(newMut("r1", "cf1", "cq1", "123")); + bw.addMutation(newMut("r1", "cf1", "cq2", "I'm a bad value")); + if (doFlush) { + try { + bw.flush(); + throw new Exception("Didn't find a bad mutation"); + } catch (MutationsRejectedException mre) { + // ignored + try { + bw.close(); + } catch (MutationsRejectedException ex) { + // ignored + } + bw = getConnector().createBatchWriter(table, new BatchWriterConfig()); + numericErrors = 1; + } + } + bw.addMutation(newMut("r1", "cf1", "cq3", "I'm a naughty value")); + bw.addMutation(newMut("@bad row@", "cf1", "cq2", "456")); + bw.addMutation(newMut("r1", "cf1", "cq4", "789")); + + boolean sawMRE = false; + + try { + bw.close(); + // should not get here + throw new Exception("Test failed, constraint did not catch bad mutation"); + } catch (MutationsRejectedException mre) { + System.out.println(mre); + + sawMRE = true; + + // verify constraint violation summary + List<ConstraintViolationSummary> cvsl = mre.getConstraintViolationSummaries(); + + if (cvsl.size() != 2) { + throw new Exception("Unexpected constraints"); + } + + HashMap<String,Integer> expected = new HashMap<String,Integer>(); + + expected.put("org.apache.accumulo.examples.simple.constraints.NumericValueConstraint", numericErrors); + expected.put("org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint", 1); + + for (ConstraintViolationSummary cvs : cvsl) { + if (expected.get(cvs.constrainClass) != cvs.numberOfViolatingMutations) { + throw new Exception("Unexpected " + cvs.constrainClass + " " + cvs.numberOfViolatingMutations); + } + } + } + + if (!sawMRE) { + throw new Exception("Did not see MutationsRejectedException"); + } + + Scanner scanner = getConnector().createScanner(table, Authorizations.EMPTY); + + Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + Entry<Key,Value> entry = iter.next(); + + if (!entry.getKey().getRow().equals(new Text("r1")) || !entry.getKey().getColumnFamily().equals(new Text("cf1")) - || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123".getBytes(Constants.UTF8)))) { ++ || !entry.getKey().getColumnQualifier().equals(new Text("cq1")) || !entry.getValue().equals(new Value("123".getBytes(UTF_8)))) { + throw new Exception("Unexpected key or value " + entry.getKey() + " " + entry.getValue()); + } + + entry = iter.next(); + + if (!entry.getKey().getRow().equals(new Text("r1")) || !entry.getKey().getColumnFamily().equals(new Text("cf1")) - || !entry.getKey().getColumnQualifier().equals(new Text("cq4")) || !entry.getValue().equals(new Value("789".getBytes(Constants.UTF8)))) { ++ || !entry.getKey().getColumnQualifier().equals(new Text("cq4")) || !entry.getValue().equals(new Value("789".getBytes(UTF_8)))) { + throw new Exception("Unexpected key or value " + entry.getKey() + " " + entry.getValue()); + } + + if (iter.hasNext()) { + entry = iter.next(); + throw new Exception("Unexpected extra key or value " + entry.getKey() + " " + entry.getValue()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/CreateAndUseIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/CreateAndUseIT.java index b9b54c7,0000000..5b5249b mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/CreateAndUseIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/CreateAndUseIT.java @@@ -1,128 -1,0 +1,129 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; ++ +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class CreateAndUseIT extends SimpleMacIT { + + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + private static SortedSet<Text> splits; + + @BeforeClass + public static void createData() throws Exception { + splits = new TreeSet<Text>(); + + for (int i = 1; i < 256; i++) { + splits.add(new Text(String.format("%08x", i << 8))); + } + } + + @Test + public void verifyDataIsPresent() throws Exception { + Text cf = new Text("cf1"); + Text cq = new Text("cq1"); + + String tableName = getUniqueNames(1)[0]; + getConnector().tableOperations().create(tableName); + getConnector().tableOperations().addSplits(tableName, splits); + BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig()); + + for (int i = 1; i < 257; i++) { + Mutation m = new Mutation(new Text(String.format("%08x", (i << 8) - 16))); - m.put(cf, cq, new Value(Integer.toString(i).getBytes(Constants.UTF8))); ++ m.put(cf, cq, new Value(Integer.toString(i).getBytes(UTF_8))); + + bw.addMutation(m); + } + + bw.close(); + Scanner scanner1 = getConnector().createScanner(tableName, Authorizations.EMPTY); + + int ei = 1; + + for (Entry<Key,Value> entry : scanner1) { + Assert.assertEquals(String.format("%08x", (ei << 8) - 16), entry.getKey().getRow().toString()); + Assert.assertEquals(Integer.toString(ei), entry.getValue().toString()); + + ei++; + } + + Assert.assertEquals("Did not see expected number of rows", 257, ei); + } + + @Test + public void createTableAndScan() throws Exception { + String table2 = getUniqueNames(1)[0]; + getConnector().tableOperations().create(table2); + getConnector().tableOperations().addSplits(table2, splits); + Scanner scanner2 = getConnector().createScanner(table2, Authorizations.EMPTY); + int count = 0; + for (Entry<Key,Value> entry : scanner2) { + if (entry != null) + count++; + } + + if (count != 0) { + throw new Exception("Did not see expected number of entries, count = " + count); + } + } + + @Test + public void createTableAndBatchScan() throws Exception { + ArrayList<Range> ranges = new ArrayList<Range>(); + for (int i = 1; i < 257; i++) { + ranges.add(new Range(new Text(String.format("%08x", (i << 8) - 16)))); + } + + String table3 = getUniqueNames(1)[0]; + getConnector().tableOperations().create(table3); + getConnector().tableOperations().addSplits(table3, splits); + BatchScanner bs = getConnector().createBatchScanner(table3, Authorizations.EMPTY, 3); + bs.setRanges(ranges); + Iterator<Entry<Key,Value>> iter = bs.iterator(); + int count = Iterators.size(iter); + bs.close(); + + Assert.assertEquals("Did not expect to find any entries", 0, count); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/DeleteEverythingIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/DeleteEverythingIT.java index 83b5b24,0000000..0578ef4 mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/DeleteEverythingIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/DeleteEverythingIT.java @@@ -1,94 -1,0 +1,94 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.*; + +import java.util.Collections; +import java.util.Map.Entry; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class DeleteEverythingIT extends ConfigurableMacIT { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setSiteConfig(Collections.singletonMap(Property.TSERV_MAJC_DELAY.getKey(), "1s")); + } + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void run() throws Exception { + Connector c = getConnector(); + c.tableOperations().create("de"); + BatchWriter bw = getConnector().createBatchWriter("de", new BatchWriterConfig()); + Mutation m = new Mutation(new Text("foo")); - m.put(new Text("bar"), new Text("1910"), new Value("5".getBytes(Constants.UTF8))); ++ m.put(new Text("bar"), new Text("1910"), new Value("5".getBytes(UTF_8))); + bw.addMutation(m); + bw.flush(); + + getConnector().tableOperations().flush("de", null, null, true); + + FunctionalTestUtils.checkRFiles(c, "de", 1, 1, 1, 1); + + m = new Mutation(new Text("foo")); + m.putDelete(new Text("bar"), new Text("1910")); + bw.addMutation(m); + bw.flush(); + + Scanner scanner = getConnector().createScanner("de", Authorizations.EMPTY); + scanner.setRange(new Range()); + int count = FunctionalTestUtils.count(scanner); + assertEquals("count == " + count, 0, count); + getConnector().tableOperations().flush("de", null, null, true); + + getConnector().tableOperations().setProperty("de", Property.TABLE_MAJC_RATIO.getKey(), "1.0"); + UtilWaitThread.sleep(4000); + + FunctionalTestUtils.checkRFiles(c, "de", 1, 1, 0, 0); + + bw.close(); + + count = 0; + for (@SuppressWarnings("unused") + Entry<Key,Value> entry : scanner) { + count++; + } + + if (count != 0) + throw new Exception("count == " + count); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9b20a9d4/test/src/test/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java index a8237a6,0000000..d35ba9f mode 100644,000000..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/DeleteRowsSplitIT.java @@@ -1,143 -1,0 +1,143 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.base.Charsets.UTF_8; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + - import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.junit.Test; + +// attempt to reproduce ACCUMULO-315 +public class DeleteRowsSplitIT extends SimpleMacIT { + + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + private static final Logger log = Logger.getLogger(DeleteRowsSplitIT.class); + + private static final String LETTERS = "abcdefghijklmnopqrstuvwxyz"; + static final SortedSet<Text> SPLITS = new TreeSet<Text>(); + static final List<String> ROWS = new ArrayList<String>(); + static { - for (byte b : LETTERS.getBytes(Constants.UTF8)) { ++ for (byte b : LETTERS.getBytes(UTF_8)) { + SPLITS.add(new Text(new byte[] {b})); - ROWS.add(new String(new byte[] {b}, Constants.UTF8)); ++ ROWS.add(new String(new byte[] {b}, UTF_8)); + } + } + + @Test + public void run() throws Exception { + // Delete ranges of rows, and verify the are removed + // Do this while adding many splits + final String tableName = getUniqueNames(1)[0]; + + // Eliminate whole tablets + for (int test = 0; test < 10; test++) { + // create a table + log.info("Test " + test); + getConnector().tableOperations().create(tableName); + + // put some data in it + fillTable(tableName); + + // generate a random delete range + final Text start = new Text(); + final Text end = new Text(); + generateRandomRange(start, end); + + // initiate the delete range + final boolean fail[] = {false}; + Thread t = new Thread() { + @Override + public void run() { + try { + // split the table + final SortedSet<Text> afterEnd = SPLITS.tailSet(new Text(end.toString() + "\0")); + getConnector().tableOperations().addSplits(tableName, afterEnd); + } catch (Exception ex) { + log.error(ex, ex); + synchronized (fail) { + fail[0] = true; + } + } + } + }; + t.start(); + + UtilWaitThread.sleep(test * 2); + + getConnector().tableOperations().deleteRows(tableName, start, end); + + t.join(); + synchronized (fail) { + assertTrue(!fail[0]); + } + + // scan the table + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + for (Entry<Key,Value> entry : scanner) { + Text row = entry.getKey().getRow(); + assertTrue(row.compareTo(start) <= 0 || row.compareTo(end) > 0); + } + + // delete the table + getConnector().tableOperations().delete(tableName); + } + } + + private void generateRandomRange(Text start, Text end) { + List<String> bunch = new ArrayList<String>(ROWS); + Collections.shuffle(bunch); + if (bunch.get(0).compareTo((bunch.get(1))) < 0) { + start.set(bunch.get(0)); + end.set(bunch.get(1)); + } else { + start.set(bunch.get(1)); + end.set(bunch.get(0)); + } + + } + + private void fillTable(String table) throws Exception { + BatchWriter bw = getConnector().createBatchWriter(table, new BatchWriterConfig()); + for (String row : ROWS) { + Mutation m = new Mutation(row); + m.put("cf", "cq", "value"); + bw.addMutation(m); + } + bw.close(); + } +}