http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java index aaf7d59..ad1ca29 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java @@ -33,10 +33,11 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -137,7 +138,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private int presplitRegions = 0; private boolean useTags = false; private int noOfTags = 1; - private HConnection connection; + private Connection connection; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); /** @@ -501,7 +502,7 @@ public class PerformanceEvaluation extends Configured implements Tool { value.getRows(), value.getTotalRows(), value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(), - HConnectionManager.createConnection(context.getConfiguration()), status); + ConnectionFactory.createConnection(context.getConfiguration()), status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -609,7 +610,7 @@ public class PerformanceEvaluation extends Configured implements Tool { final int preSplitRegions = this.presplitRegions; final boolean useTags = this.useTags; final int numTags = this.noOfTags; - final HConnection connection = HConnectionManager.createConnection(getConf()); + final Connection connection = ConnectionFactory.createConnection(getConf()); for (int i = 0; i < this.N; i++) { final int index = i; Thread t = new Thread ("TestClient-" + i) { @@ -684,7 +685,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); @@ -790,14 +791,14 @@ public class PerformanceEvaluation extends Configured implements Tool { private boolean writeToWAL = true; private boolean useTags = false; private int noOfTags = 0; - private HConnection connection; + private Connection connection; TestOptions() { } TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, - int noOfTags, HConnection connection) { + int noOfTags, Connection connection) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; @@ -838,7 +839,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return writeToWAL; } - public HConnection getConnection() { + public Connection getConnection() { return connection; } @@ -870,13 +871,11 @@ public class PerformanceEvaluation extends Configured implements Tool { protected final int totalRows; private final Status status; protected TableName tableName; - protected Table table; protected volatile Configuration conf; - protected boolean flushCommits; protected boolean writeToWAL; protected boolean useTags; protected int noOfTags; - protected HConnection connection; + protected Connection connection; /** * Note that all subclasses of this class must provide a public contructor @@ -889,9 +888,7 @@ public class PerformanceEvaluation extends Configured implements Tool { this.totalRows = options.getTotalRows(); this.status = status; this.tableName = options.getTableName(); - this.table = null; this.conf = conf; - this.flushCommits = options.isFlushCommits(); this.writeToWAL = options.isWriteToWAL(); this.useTags = options.isUseTags(); this.noOfTags = options.getNumTags(); @@ -907,18 +904,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return period == 0? this.perClientRunRows: period; } - void testSetup() throws IOException { - this.table = connection.getTable(tableName); - this.table.setAutoFlushTo(false); - } - - void testTakedown() throws IOException { - if (flushCommits) { - this.table.flushCommits(); - } - table.close(); - } - + abstract void testTakedown() throws IOException; /* * Run test * @return Elapsed time. @@ -936,6 +922,8 @@ public class PerformanceEvaluation extends Configured implements Tool { return (System.nanoTime() - startTime) / 1000000; } + abstract void testSetup() throws IOException; + /** * Provides an extension point for tests that don't want a per row invocation. */ @@ -957,8 +945,45 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException; } - @SuppressWarnings("unused") - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + public TableTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + void testSetup() throws IOException { + this.table = connection.getTable(tableName); + } + + @Override + void testTakedown() throws IOException { + table.close(); + } + } + + static abstract class BufferedMutatorTest extends Test { + protected BufferedMutator mutator; + protected boolean flushCommits; + + public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + this.flushCommits = options.isFlushCommits(); + } + + void testSetup() throws IOException { + this.mutator = connection.getBufferedMutator(tableName); + } + + void testTakedown() throws IOException { + if (flushCommits) { + this.mutator.flush(); + } + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -981,7 +1006,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @SuppressWarnings("unused") - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1065,7 +1090,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1085,7 +1110,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1109,11 +1134,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Configuration conf, TestOptions options, Status status) { @@ -1141,7 +1166,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1155,7 +1180,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1180,11 +1205,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Configuration conf, TestOptions options, Status status) { @@ -1268,7 +1293,7 @@ public class PerformanceEvaluation extends Configured implements Tool { long runOneClient(final Class<? extends Test> cmd, final int startRow, final int perClientRunRows, final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, - HConnection connection, final Status status) + Connection connection, final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); @@ -1463,7 +1488,7 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } - this.connection = HConnectionManager.createConnection(getConf()); + this.connection = ConnectionFactory.createConnection(getConf()); final String useTags = "--usetags="; if (cmd.startsWith(useTags)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java index baf9961..121ff65 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.rest.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import java.io.IOException; import java.util.ArrayList; @@ -37,12 +37,12 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -99,9 +99,7 @@ public class TestRemoteTable { htd.addFamily(new HColumnDescriptor(COLUMN_2).setMaxVersions(3)); htd.addFamily(new HColumnDescriptor(COLUMN_3).setMaxVersions(3)); admin.createTable(htd); - Table table = null; - try { - table = new HTable(TEST_UTIL.getConfiguration(), TABLE); + try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) { Put put = new Put(ROW_1); put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1); table.put(put); @@ -110,9 +108,6 @@ public class TestRemoteTable { put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2); put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2); table.put(put); - table.flushCommits(); - } finally { - if (null != table) table.close(); } remoteTable = new RemoteHTable( new Client(new Cluster().add("localhost", @@ -349,7 +344,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_2, value2)); Delete delete = new Delete(ROW_3); - delete.deleteColumn(COLUMN_2, QUALIFIER_2); + delete.addColumn(COLUMN_2, QUALIFIER_2); remoteTable.delete(delete); get = new Get(ROW_3); @@ -464,7 +459,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_1, value1)); assertNull(value2); assertTrue(remoteTable.exists(get)); - assertEquals(1, remoteTable.exists(Collections.singletonList(get)).length); + assertEquals(1, remoteTable.existsAll(Collections.singletonList(get)).length); Delete delete = new Delete(ROW_1); remoteTable.checkAndDelete(ROW_1, COLUMN_1, QUALIFIER_1, VALUE_1, delete); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index c583923..1f84bb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -55,8 +55,7 @@ import com.google.protobuf.ServiceException; */ public class HTableWrapper implements HTableInterface { - private TableName tableName; - private final Table table; + private final HTableInterface table; private ClusterConnection connection; private final List<HTableInterface> openTables; @@ -73,7 +72,6 @@ public class HTableWrapper implements HTableInterface { private HTableWrapper(List<HTableInterface> openTables, TableName tableName, ClusterConnection connection, ExecutorService pool) throws IOException { - this.tableName = tableName; this.table = connection.getTable(tableName, pool); this.connection = connection; this.openTables = openTables; @@ -232,7 +230,7 @@ public class HTableWrapper implements HTableInterface { @Override public byte[] getTableName() { - return tableName.getName(); + return table.getTableName(); } @Override @@ -307,7 +305,7 @@ public class HTableWrapper implements HTableInterface { @Override public void setAutoFlush(boolean autoFlush) { - table.setAutoFlushTo(autoFlush); + table.setAutoFlush(autoFlush); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java index 563b1f8..6e0d9e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InvalidJobConfException; @@ -52,22 +52,22 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, * and write to an HBase table. */ protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { - private Table m_table; + private BufferedMutator m_mutator; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the * lifecycle of {@code conn}. */ - public TableRecordWriter(final Table table) throws IOException { - this.m_table = table; + public TableRecordWriter(final BufferedMutator mutator) throws IOException { + this.m_mutator = mutator; } public void close(Reporter reporter) throws IOException { - this.m_table.close(); + this.m_mutator.close(); } public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + m_mutator.mutate(new Put(value)); } } @@ -77,13 +77,12 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, throws IOException { // expecting exactly one path TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); - Table table = null; + BufferedMutator mutator = null; // Connection is not closed. Dies with JVM. No possibility for cleanup. Connection connection = ConnectionFactory.createConnection(job); - table = connection.getTable(tableName); + mutator = connection.getBufferedMutator(tableName); // Clear write buffer on fail is true by default so no need to reset it. - table.setAutoFlushTo(false); - return new TableRecordWriter(table); + return new TableRecordWriter(mutator); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java index 20cf50a..616fa81 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java @@ -29,6 +29,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; @@ -73,7 +76,8 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, protected static class MultiTableRecordWriter extends RecordWriter<ImmutableBytesWritable, Mutation> { private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); - Map<ImmutableBytesWritable, HTable> tables; + Connection connection; + Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>(); Configuration conf; boolean useWriteAheadLogging; @@ -88,7 +92,6 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, boolean useWriteAheadLogging) { LOG.debug("Created new MultiTableRecordReader with WAL " + (useWriteAheadLogging ? "on" : "off")); - this.tables = new HashMap<ImmutableBytesWritable, HTable>(); this.conf = conf; this.useWriteAheadLogging = useWriteAheadLogging; } @@ -96,24 +99,28 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, /** * @param tableName * the name of the table, as a string - * @return the named table + * @return the named mutator * @throws IOException * if there is a problem opening a table */ - HTable getTable(ImmutableBytesWritable tableName) throws IOException { - if (!tables.containsKey(tableName)) { + BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException { + if(this.connection == null){ + this.connection = ConnectionFactory.createConnection(conf); + } + if (!mutatorMap.containsKey(tableName)) { LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing"); - HTable table = new HTable(conf, TableName.valueOf(tableName.get())); - table.setAutoFlushTo(false); - tables.put(tableName, table); + + BufferedMutator mutator = + connection.getBufferedMutator(TableName.valueOf(tableName.get())); + mutatorMap.put(tableName, mutator); } - return tables.get(tableName); + return mutatorMap.get(tableName); } @Override public void close(TaskAttemptContext context) throws IOException { - for (HTable table : tables.values()) { - table.flushCommits(); + for (BufferedMutator mutator : mutatorMap.values()) { + mutator.flush(); } } @@ -129,16 +136,16 @@ public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, */ @Override public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException { - HTable table = getTable(tableName); + BufferedMutator mutator = getBufferedMutator(tableName); // The actions are not immutable, so we defensively copy them if (action instanceof Put) { Put put = new Put((Put) action); put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } else if (action instanceof Delete) { Delete delete = new Delete((Delete) action); - table.delete(delete); + mutator.mutate(delete); } else throw new IllegalArgumentException( "action must be either Delete or Put"); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index c46f41f..7b23075 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -29,13 +29,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -86,7 +86,7 @@ implements Configurable { extends RecordWriter<KEY, Mutation> { private Connection connection; - private Table table; + private BufferedMutator mutator; /** * @throws IOException @@ -95,8 +95,7 @@ implements Configurable { public TableRecordWriter() throws IOException { String tableName = conf.get(OUTPUT_TABLE); this.connection = ConnectionFactory.createConnection(conf); - this.table = connection.getTable(TableName.valueOf(tableName)); - this.table.setAutoFlushTo(false); + this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); LOG.info("Created table instance for " + tableName); } /** @@ -104,12 +103,12 @@ implements Configurable { * * @param context The context. * @throws IOException When closing the writer fails. - * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see RecordWriter#close(TaskAttemptContext) */ @Override public void close(TaskAttemptContext context) throws IOException { - table.close(); + mutator.close(); connection.close(); } @@ -119,14 +118,15 @@ implements Configurable { * @param key The key. * @param value The value. * @throws IOException When writing fails. - * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object) + * @see RecordWriter#write(Object, Object) */ @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) table.put(new Put((Put)value)); - else if (value instanceof Delete) table.delete(new Delete((Delete)value)); - else throw new IOException("Pass a Delete or a Put"); + if (!(value instanceof Put) && !(value instanceof Delete)) { + throw new IOException("Pass a Delete or a Put"); + } + mutator.mutate(value); } } @@ -137,11 +137,9 @@ implements Configurable { * @return The newly created writer instance. * @throws IOException When creating the writer fails. * @throws InterruptedException When the jobs is cancelled. - * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override - public RecordWriter<KEY, Mutation> getRecordWriter( - TaskAttemptContext context) + public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { return new TableRecordWriter(); } @@ -152,7 +150,7 @@ implements Configurable { * @param context The current context. * @throws IOException When the check fails. * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) + * @see OutputFormat#checkOutputSpecs(JobContext) */ @Override public void checkOutputSpecs(JobContext context) throws IOException, @@ -168,7 +166,7 @@ implements Configurable { * @return The committer. * @throws IOException When creating the committer fails. * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see OutputFormat#getOutputCommitter(TaskAttemptContext) */ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 5a89af4..e195c09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -52,13 +52,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -394,6 +393,7 @@ public class PerformanceEvaluation extends Configured implements Tool { throws IOException, InterruptedException { final Class<? extends Test> cmd = determineCommandClass(opts.cmdName); assert cmd != null; + @SuppressWarnings("unchecked") Future<RunResult>[] threads = new Future[opts.numClientThreads]; RunResult[] results = new RunResult[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, @@ -459,7 +459,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation - " + opts.cmdName); @@ -909,7 +909,6 @@ public class PerformanceEvaluation extends Configured implements Tool { private final Sampler<?> traceSampler; private final SpanReceiverHost receiverHost; protected Connection connection; - protected Table table; private String testName; private Histogram latency; @@ -991,25 +990,25 @@ public class PerformanceEvaluation extends Configured implements Tool { if (!opts.oneCon) { this.connection = ConnectionFactory.createConnection(conf); } - this.table = new HTable(TableName.valueOf(opts.tableName), connection); - this.table.setAutoFlushTo(opts.autoFlush); + onStartup(); latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); } + abstract void onStartup() throws IOException; + void testTakedown() throws IOException { reportLatency(); reportValueSize(); - if (opts.flushCommits) { - this.table.flushCommits(); - } - table.close(); + onTakedown(); if (!opts.oneCon) { connection.close(); } receiverHost.closeReceivers(); } + abstract void onTakedown() throws IOException; + /* * Run test * @return Elapsed time. @@ -1100,7 +1099,43 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException, InterruptedException; } - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + TableTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.table = connection.getTable(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + table.close(); + } + } + + static abstract class BufferedMutatorTest extends Test { + protected BufferedMutator mutator; + + BufferedMutatorTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1130,7 +1165,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1218,7 +1253,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { private final Consistency consistency; private ArrayList<Get> gets; private Random rd = new Random(); @@ -1272,7 +1307,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1298,11 +1333,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Connection con, TestOptions options, Status status) { @@ -1335,7 +1370,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1351,7 +1386,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1377,11 +1412,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Connection con, TestOptions options, Status status) { @@ -1533,12 +1568,9 @@ public class PerformanceEvaluation extends Configured implements Tool { // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do // the TestOptions introspection for us and dump the output in a readable format. LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts)); - Admin admin = null; - try { - admin = new HBaseAdmin(getConf()); + try(Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { checkTable(admin, opts); - } finally { - if (admin != null) admin.close(); } if (opts.nomapred) { doLocalClients(opts, getConf()); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java index 0bba51a..d7e56c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java @@ -133,7 +133,7 @@ public class TestClientPushback { final CountDownLatch latch = new CountDownLatch(1); final AtomicLong endTime = new AtomicLong(); long startTime = EnvironmentEdgeManager.currentTime(); - table.ap.submit(tablename, ops, true, new Batch.Callback<Result>() { + table.mutator.ap.submit(tablename, ops, true, new Batch.Callback<Result>() { @Override public void update(byte[] region, byte[] row, Result result) { endTime.set(EnvironmentEdgeManager.currentTime()); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java index b6502c5..e05a2fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java @@ -101,31 +101,30 @@ public class TestCloneSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); - try { - // enable table and insert data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); + // enable table and insert data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ snapshot0Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); + } + admin.disableTable(tableName); - // take a snapshot - admin.snapshot(snapshotName0, tableName); + // take a snapshot + admin.snapshot(snapshotName0, tableName); - // enable table and insert more data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); + // enable table and insert more data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ snapshot1Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); + } + admin.disableTable(tableName); - // take a snapshot of the updated table - admin.snapshot(snapshotName1, tableName); + // take a snapshot of the updated table + admin.snapshot(snapshotName1, tableName); - // re-enable table - admin.enableTable(tableName); - } finally { - table.close(); - } + // re-enable table + admin.enableTable(tableName); } protected int getNumReplicas() { http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index bc805fe..f10e88b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -692,15 +692,15 @@ public class TestFromClientSide { public void testMaxKeyValueSize() throws Exception { byte [] TABLE = Bytes.toBytes("testMaxKeyValueSize"); Configuration conf = TEST_UTIL.getConfiguration(); - String oldMaxSize = conf.get("hbase.client.keyvalue.maxsize"); + String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY); Table ht = TEST_UTIL.createTable(TABLE, FAMILY); byte[] value = new byte[4 * 1024 * 1024]; Put put = new Put(ROW); put.add(FAMILY, QUALIFIER, value); ht.put(put); try { - TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 2 * 1024 * 1024); - TABLE = Bytes.toBytes("testMaxKeyValueSize2"); + TEST_UTIL.getConfiguration().setInt( + TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); // Create new table so we pick up the change in Configuration. try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { @@ -712,7 +712,7 @@ public class TestFromClientSide { } fail("Inserting a too large KeyValue worked, should throw exception"); } catch(Exception e) {} - conf.set("hbase.client.keyvalue.maxsize", oldMaxSize); + conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); } @Test @@ -3906,7 +3906,7 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"), new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY}); - table.setAutoFlushTo(false); + table.setAutoFlush(false); ArrayList<Put> rowsUpdate = new ArrayList<Put>(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { byte[] row = Bytes.toBytes("row" + i); @@ -3937,6 +3937,7 @@ public class TestFromClientSide { Result row : scanner) nbRows++; assertEquals(NB_BATCH_ROWS * 10, nbRows); + table.close(); } @Test @@ -3947,7 +3948,6 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"), new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY }); - table.setAutoFlushTo(false); table.setWriteBufferSize(10); ArrayList<Put> rowsUpdate = new ArrayList<Put>(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { @@ -3959,8 +3959,6 @@ public class TestFromClientSide { } table.put(rowsUpdate); - table.flushCommits(); - Scan scan = new Scan(); scan.addFamily(CONTENTS_FAMILY); ResultScanner scanner = table.getScanner(scan); @@ -4149,6 +4147,7 @@ public class TestFromClientSide { HBaseAdmin ha = new HBaseAdmin(t.getConnection()); assertTrue(ha.tableExists(tableName)); assertTrue(t.get(new Get(ROW)).isEmpty()); + ha.close(); } /** @@ -4162,9 +4161,10 @@ public class TestFromClientSide { final TableName tableName = TableName.valueOf("testUnmanagedHConnectionReconnect"); HTable t = createUnmangedHConnectionHTable(tableName); Connection conn = t.getConnection(); - HBaseAdmin ha = new HBaseAdmin(conn); - assertTrue(ha.tableExists(tableName)); - assertTrue(t.get(new Get(ROW)).isEmpty()); + try (HBaseAdmin ha = new HBaseAdmin(conn)) { + assertTrue(ha.tableExists(tableName)); + assertTrue(t.get(new Get(ROW)).isEmpty()); + } // stop the master MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); @@ -4177,9 +4177,10 @@ public class TestFromClientSide { // test that the same unmanaged connection works with a new // HBaseAdmin and can connect to the new master; - HBaseAdmin newAdmin = new HBaseAdmin(conn); - assertTrue(newAdmin.tableExists(tableName)); - assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES); + try (HBaseAdmin newAdmin = new HBaseAdmin(conn)) { + assertTrue(newAdmin.tableExists(tableName)); + assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES); + } } @Test @@ -4276,7 +4277,6 @@ public class TestFromClientSide { new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024); // set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow // in Store.rowAtOrBeforeFromStoreFile - table.setAutoFlush(true); String regionName = table.getRegionLocations().firstKey().getEncodedName(); HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName); @@ -4351,6 +4351,8 @@ public class TestFromClientSide { assertTrue(result.containsColumn(HConstants.CATALOG_FAMILY, null)); assertTrue(Bytes.equals(result.getRow(), forthRow)); assertTrue(Bytes.equals(result.getValue(HConstants.CATALOG_FAMILY, null), four)); + + table.close(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 428c637..61cb16a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -149,7 +149,7 @@ public class TestMultiParallel { ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); try { try (Table t = connection.getTable(TEST_TABLE, executor)) { - List<Row> puts = constructPutRequests(); // creates a Put for every region + List<Put> puts = constructPutRequests(); // creates a Put for every region t.batch(puts); HashSet<ServerName> regionservers = new HashSet<ServerName>(); try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { @@ -172,7 +172,7 @@ public class TestMultiParallel { Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // load test data - List<Row> puts = constructPutRequests(); + List<Put> puts = constructPutRequests(); table.batch(puts); // create a list of gets and run it @@ -261,17 +261,13 @@ public class TestMultiParallel { private void doTestFlushCommits(boolean doAbort) throws Exception { // Load the data LOG.info("get new table"); - Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); - table.setAutoFlushTo(false); + Table table = UTIL.getConnection().getTable(TEST_TABLE); table.setWriteBufferSize(10 * 1024 * 1024); LOG.info("constructPutRequests"); - List<Row> puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } + List<Put> puts = constructPutRequests(); + table.put(puts); LOG.info("puts"); - table.flushCommits(); final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() .size(); assert liveRScount > 0; @@ -290,11 +286,7 @@ public class TestMultiParallel { // try putting more keys after the abort. same key/qual... just validating // no exceptions thrown puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } - - table.flushCommits(); + table.put(puts); } LOG.info("validating loaded data"); @@ -331,7 +323,7 @@ public class TestMultiParallel { LOG.info("test=testBatchWithPut"); Table table = CONNECTION.getTable(TEST_TABLE); // put multiple rows using a batch - List<Row> puts = constructPutRequests(); + List<Put> puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -363,7 +355,7 @@ public class TestMultiParallel { Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // Load some data - List<Row> puts = constructPutRequests(); + List<Put> puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -371,7 +363,7 @@ public class TestMultiParallel { List<Row> deletes = new ArrayList<Row>(); for (int i = 0; i < KEYS.length; i++) { Delete delete = new Delete(KEYS[i]); - delete.deleteFamily(BYTES_FAMILY); + delete.addFamily(BYTES_FAMILY); deletes.add(delete); } results = table.batch(deletes); @@ -392,7 +384,7 @@ public class TestMultiParallel { Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // Load some data - List<Row> puts = constructPutRequests(); + List<Put> puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -664,8 +656,8 @@ public class TestMultiParallel { } } - private List<Row> constructPutRequests() { - List<Row> puts = new ArrayList<Row>(); + private List<Put> constructPutRequests() { + List<Put> puts = new ArrayList<>(); for (byte[] k : KEYS) { Put put = new Put(k); put.add(BYTES_FAMILY, QUALIFIER, VALUE); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java index 0eec477..d488f33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java @@ -110,11 +110,12 @@ public class TestRestoreSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); // enable table and insert data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot0Rows = TEST_UTIL.countRows(table); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot0Rows = TEST_UTIL.countRows(table); + } admin.disableTable(tableName); // take a snapshot @@ -122,9 +123,10 @@ public class TestRestoreSnapshotFromClient { // enable table and insert more data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot1Rows = TEST_UTIL.countRows(table); - table.close(); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot1Rows = TEST_UTIL.countRows(table); + } } @After @@ -183,7 +185,7 @@ public class TestRestoreSnapshotFromClient { assertEquals(2, table.getTableDescriptor().getFamilies().size()); HTableDescriptor htd = admin.getTableDescriptor(tableName); assertEquals(2, htd.getFamilies().size()); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2); long snapshot2Rows = snapshot1Rows + 500; assertEquals(snapshot2Rows, TEST_UTIL.countRows(table)); assertEquals(500, TEST_UTIL.countRows(table, TEST_FAMILY2)); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index 3db2d9f..94a7819 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -130,18 +130,18 @@ public class TestRpcControllerFactory { // change one of the connection properties so we get a new HConnection with our configuration conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1); - Table table = new HTable(conf, name); - table.setAutoFlushTo(false); + Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(name); byte[] row = Bytes.toBytes("row"); Put p = new Put(row); p.add(fam1, fam1, Bytes.toBytes("val0")); table.put(p); - table.flushCommits(); + Integer counter = 1; counter = verifyCount(counter); Delete d = new Delete(row); - d.deleteColumn(fam1, fam1); + d.addColumn(fam1, fam1); table.delete(d); counter = verifyCount(counter); @@ -200,4 +200,4 @@ public class TestRpcControllerFactory { assertEquals(0, CountingRpcController.INT_PRIORITY.get()); return counter + 1; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java index 1c6bf8e..d178ba1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java @@ -175,11 +175,11 @@ public class TestHTableWrapper { private void checkAutoFlush() { boolean initialAutoFlush = hTableInterface.isAutoFlush(); - hTableInterface.setAutoFlushTo(false); + hTableInterface.setAutoFlush(false); assertFalse(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(true); + hTableInterface.setAutoFlush(true); assertTrue(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(initialAutoFlush); + hTableInterface.setAutoFlush(initialAutoFlush); } private void checkBufferSize() throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 8daafe4..fa4564c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -937,7 +937,6 @@ public class TestDistributedLogSplitting { if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } - ht.setAutoFlushTo(true); Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); @@ -1607,11 +1606,11 @@ public class TestDistributedLogSplitting { /** * Load table with puts and deletes with expected values so that we can verify later */ - private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException { - t.setAutoFlushTo(false); + private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException { byte[] k = new byte[3]; // add puts + List<Put> puts = new ArrayList<>(); for (byte b1 = 'a'; b1 <= 'z'; b1++) { for (byte b2 = 'a'; b2 <= 'z'; b2++) { for (byte b3 = 'a'; b3 <= 'z'; b3++) { @@ -1620,11 +1619,11 @@ public class TestDistributedLogSplitting { k[2] = b3; Put put = new Put(k); put.add(f, column, k); - t.put(put); + puts.add(put); } } } - t.flushCommits(); + t.put(puts); // add deletes for (byte b3 = 'a'; b3 <= 'z'; b3++) { k[0] = 'a'; @@ -1633,7 +1632,6 @@ public class TestDistributedLogSplitting { Delete del = new Delete(k); t.delete(del); } - t.flushCommits(); } private void waitForCounter(AtomicLong ctr, long oldval, long newval, http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index c1c148e..70cb2fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -82,11 +82,11 @@ public class TestMaster { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); - HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME); - assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, - ZooKeeperProtos.Table.State.ENABLED)); - TEST_UTIL.loadTable(ht, FAMILYNAME, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME)) { + assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, + ZooKeeperProtos.Table.State.ENABLED)); + TEST_UTIL.loadTable(ht, FAMILYNAME, false); + } List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor.getTableRegionsAndLocations( m.getZooKeeper(), http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 6d2b172..179ae9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -101,9 +101,9 @@ public class TestEndToEndSplitTransaction { TableName tableName = TableName.valueOf("TestSplit"); byte[] familyName = Bytes.toBytes("fam"); - HTable ht = TEST_UTIL.createTable(tableName, familyName); - TEST_UTIL.loadTable(ht, familyName, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) { + TEST_UTIL.loadTable(ht, familyName, false); + } HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); byte []firstRow = Bytes.toBytes("aaa"); byte []splitRow = Bytes.toBytes("lll"); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index c508e70..d24ba4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -31,7 +31,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; @@ -42,11 +41,11 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -200,23 +199,22 @@ public class TestFSErrorsExposed { util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Make a new Configuration so it makes a new connection that has the // above configuration on it; else we use the old one w/ 10 as default. - HTable table = new HTable(new Configuration(util.getConfiguration()), tableName); - - // Load some data - util.loadTable(table, fam, false); - table.flushCommits(); - util.flush(); - util.countRows(table); + try (Table table = util.getConnection().getTable(tableName)) { + // Load some data + util.loadTable(table, fam, false); + util.flush(); + util.countRows(table); - // Kill the DFS cluster - util.getDFSCluster().shutdownDataNodes(); + // Kill the DFS cluster + util.getDFSCluster().shutdownDataNodes(); - try { - util.countRows(table); - fail("Did not fail to count after removing data"); - } catch (Exception e) { - LOG.info("Got expected error", e); - assertTrue(e.getMessage().contains("Could not seek")); + try { + util.countRows(table); + fail("Did not fail to count after removing data"); + } catch (Exception e) { + LOG.info("Got expected error", e); + assertTrue(e.getMessage().contains("Could not seek")); + } } // Restart data nodes so that HBase can shut down cleanly. http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java index 46a4062..08b3b99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java @@ -78,6 +78,7 @@ public class TestRegionFavoredNodes { @AfterClass public static void tearDownAfterClass() throws Exception { + table.close(); if (createWithFavoredNode == null) { return; } http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 9ba224a..473946c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -30,6 +30,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; + import static org.junit.Assert.*; import java.io.IOException; @@ -108,10 +109,11 @@ public class TestRegionServerMetrics { TEST_UTIL.createTable(tName, cfName); - new HTable(conf, tName).close(); //wait for the table to come up. + Connection connection = TEST_UTIL.getConnection(); + connection.getTable(tName).close(); //wait for the table to come up. // Do a first put to be sure that the connection is established, meta is there and so on. - HTable table = new HTable(conf, tName); + Table table = connection.getTable(tName); Put p = new Put(row); p.add(cfName, qualifier, initValue); table.put(p); @@ -140,19 +142,21 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 10, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - for ( HRegionInfo i:table.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); - metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + try (RegionLocator locator = connection.getRegionLocator(tName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); + metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + } } - List<Get> gets = new ArrayList<Get>(); for (int i=0; i< 10; i++) { gets.add(new Get(row)); @@ -169,11 +173,11 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 20, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - table.setAutoFlushTo(false); + List<Put> puts = new ArrayList<>(); for (int i=0; i< 30; i++) { - table.put(p); + puts.add(p); } - table.flushCommits(); + table.put(puts); metricsRegionServer.getRegionServerWrapper().forceRecompute(); metricsHelper.assertCounter("totalRequestCount", requests + 80, serverSource); @@ -338,36 +342,39 @@ public class TestRegionServerMetrics { byte[] val = Bytes.toBytes("One"); - TEST_UTIL.createTable(tableName, cf); - HTable t = new HTable(conf, tableName); - t.setAutoFlushTo(false); + List<Put> puts = new ArrayList<>(); for (int insertCount =0; insertCount < 100; insertCount++) { Put p = new Put(Bytes.toBytes("" + insertCount + "row")); p.add(cf, qualifier, val); - t.put(p); + puts.add(p); } - t.flushCommits(); - - Scan s = new Scan(); - s.setBatch(1); - s.setCaching(1); - ResultScanner resultScanners = t.getScanner(s); - - for (int nextCount = 0; nextCount < 30; nextCount++) { - Result result = resultScanners.next(); - assertNotNull(result); - assertEquals(1, result.size()); + try (HTable t = TEST_UTIL.createTable(tableName, cf)) { + t.put(puts); + + Scan s = new Scan(); + s.setBatch(1); + s.setCaching(1); + ResultScanner resultScanners = t.getScanner(s); + + for (int nextCount = 0; nextCount < 30; nextCount++) { + Result result = resultScanners.next(); + assertNotNull(result); + assertEquals(1, result.size()); + } } - for ( HRegionInfo i:t.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 8e3ca88..63d9c9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -88,7 +88,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); @@ -169,19 +168,16 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); Put put1 = new Put(Bytes.toBytes("row2")); put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put1); - table.flushCommits(); admin.flush(tableName); put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version1"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); admin.compact(tableName); @@ -218,8 +214,7 @@ public class TestScannerWithBulkload { put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes.toBytes("version0"))); table.put(put1); - table.flushCommits(); - bulkload.doBulkLoad(hfilePath, table); + bulkload.doBulkLoad(hfilePath, (HTable) table); latch.countDown(); } catch (TableNotFoundException e) { } catch (IOException e) { @@ -260,7 +255,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 0890c48..2e18fba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -310,8 +310,8 @@ public class TestLogRolling { desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); admin.createTable(desc); - Table table = new HTable(TEST_UTIL.getConfiguration(), desc.getTableName()); - assertTrue(table.isAutoFlush()); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + assertTrue(((HTable) table).isAutoFlush()); server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); final FSHLog log = (FSHLog) server.getWAL(null); @@ -455,8 +455,6 @@ public class TestLogRolling { writeData(table, 1002); - table.setAutoFlushTo(true); - long curTime = System.currentTimeMillis(); LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log)); long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index d0caa45..d858321 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -53,7 +52,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(false); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for (JVMClusterUtil.RegionServerThread r : @@ -118,7 +116,10 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas Put put = new Put(row); put.add(famName, row, row); - htable1 = new HTable(conf1, tableName); + if (htable1 == null) { + htable1 = utility1.getConnection().getTable(tableName); + } + htable1.put(put); Get get = new Get(row); http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 7ecdaf7..4163b66 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -35,12 +35,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -69,7 +69,6 @@ public class TestReplicationSmallTests extends TestReplicationBase { */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for ( JVMClusterUtil.RegionServerThread r : http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java index 6a39c8a..5010365 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java @@ -149,9 +149,8 @@ public class TestReplicationWithTags { Admin admin = conn.getAdmin()) { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } - htable1 = new HTable(conf1, TABLE_NAME); - htable1.setWriteBufferSize(1024); - htable2 = new HTable(conf2, TABLE_NAME); + htable1 = utility1.getConnection().getTable(TABLE_NAME); + htable2 = utility2.getConnection().getTable(TABLE_NAME); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 1b1312a..98a0886 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -393,25 +393,18 @@ public class TestVisibilityLabelsReplication { } static Table writeData(TableName tableName, String... labelExps) throws Exception { - HTable table = null; - try { - table = new HTable(conf, TABLE_NAME_BYTES); - int i = 1; - List<Put> puts = new ArrayList<Put>(); - for (String labelExp : labelExps) { - Put put = new Put(Bytes.toBytes("row" + i)); - put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); - put.setCellVisibility(new CellVisibility(labelExp)); - put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); - puts.add(put); - i++; - } - table.put(puts); - } finally { - if (table != null) { - table.flushCommits(); - } + Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME)); + int i = 1; + List<Put> puts = new ArrayList<Put>(); + for (String labelExp : labelExps) { + Put put = new Put(Bytes.toBytes("row" + i)); + put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); + put.setCellVisibility(new CellVisibility(labelExp)); + put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); + puts.add(put); + i++; } + table.put(puts); return table; } // A simple BaseRegionbserver impl that allows to add a non-visibility tag from the http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java index 0d87dc2..11015a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.ArrayList; import java.util.HashSet; - import java.util.List; import java.util.Map; import java.util.Set; @@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -670,20 +670,23 @@ public class SnapshotTestingUtils { public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows, byte[]... families) throws IOException, InterruptedException { - loadData(util, new HTable(util.getConfiguration(), tableName), rows, families); + try (BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName)) { + loadData(util, mutator, rows, families); + } } - public static void loadData(final HBaseTestingUtility util, final Table table, int rows, + public static void loadData(final HBaseTestingUtility util, final BufferedMutator mutator, int rows, byte[]... families) throws IOException, InterruptedException { - table.setAutoFlushTo(false); - // Ensure one row per region assertTrue(rows >= KEYS.length); for (byte k0: KEYS) { byte[] k = new byte[] { k0 }; byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k); byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.mutate(createPut(families1, key1, value1)); rows--; } @@ -691,22 +694,24 @@ public class SnapshotTestingUtils { while (rows-- > 0) { byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.mutate(createPut(families1, key1, value1)); } - table.flushCommits(); + mutator.flush(); - waitForTableToBeOnline(util, table.getName()); + waitForTableToBeOnline(util, mutator.getName()); } - private static void putData(final Table table, final byte[][] families, - final byte[] key, final byte[] value) throws IOException { + private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { byte[] q = Bytes.toBytes("q"); Put put = new Put(key); put.setDurability(Durability.SKIP_WAL); for (byte[] family: families) { put.add(family, q, value); } - table.put(put); + return put; } public static void deleteAllSnapshots(final Admin admin) http://git-wip-us.apache.org/repos/asf/hbase/blob/7bbbaaeb/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java index a1f4605..710d4ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java @@ -37,12 +37,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -70,7 +70,6 @@ public class TestFlushSnapshotFromClient { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final int NUM_RS = 2; private static final byte[] TEST_FAM = Bytes.toBytes("fam"); - private static final byte[] TEST_QUAL = Bytes.toBytes("q"); private static final TableName TABLE_NAME = TableName.valueOf("test"); private final int DEFAULT_NUM_ROWS = 100; @@ -139,8 +138,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), @@ -178,8 +176,9 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); - UTIL.loadTable(table, TEST_FAM); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + UTIL.loadTable(table, TEST_FAM); + } LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), @@ -222,8 +221,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(),
