HBASE-12783 Revert - two tests in TestAssignmentManager fail
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/173eba81 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/173eba81 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/173eba81 Branch: refs/heads/branch-1 Commit: 173eba815bd7d97d15be69893d4b0836a08cf42b Parents: 44f392a Author: tedyu <[email protected]> Authored: Sat Jan 3 06:57:46 2015 -0800 Committer: tedyu <[email protected]> Committed: Sat Jan 3 06:57:46 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionManager.java | 6 +- .../hadoop/hbase/client/HRegionLocator.java | 148 ------------- .../org/apache/hadoop/hbase/client/HTable.java | 71 +++--- .../apache/hadoop/hbase/client/HTableUtil.java | 4 +- .../hbase/client/RegionServerCallable.java | 10 +- .../hbase/zookeeper/MetaTableLocator.java | 2 + .../mapreduce/IntegrationTestBulkLoad.java | 52 ++--- .../hbase/mapreduce/HFileOutputFormat.java | 25 ++- .../hbase/mapreduce/HFileOutputFormat2.java | 80 +++---- .../apache/hadoop/hbase/mapreduce/Import.java | 50 ++--- .../hadoop/hbase/mapreduce/ImportTsv.java | 29 ++- .../hbase/mapreduce/LoadIncrementalHFiles.java | 107 ++++----- .../mapreduce/MultiTableInputFormatBase.java | 14 +- .../hbase/mapreduce/TableInputFormatBase.java | 2 +- .../hadoop/hbase/mapreduce/WALPlayer.java | 31 ++- .../hadoop/hbase/util/RegionSizeCalculator.java | 2 +- .../apache/hadoop/hbase/wal/WALSplitter.java | 34 +-- .../hadoop/hbase/HBaseTestingUtility.java | 103 ++++----- .../hadoop/hbase/TestRegionRebalancing.java | 106 ++++----- .../hadoop/hbase/client/TestFromClientSide.java | 116 +++++----- .../hbase/mapreduce/TestHFileOutputFormat.java | 24 +-- .../hbase/mapreduce/TestHFileOutputFormat2.java | 147 ++++++------- .../TestLoadIncrementalHFilesSplitRecovery.java | 12 +- .../mapreduce/TestMultiTableInputFormat.java | 11 +- .../regionserver/TestServerCustomProtocol.java | 216 +++++++++---------- 25 files changed, 599 insertions(+), 803 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 7bfa972..88e045b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -706,7 +706,11 @@ class ConnectionManager { @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { - return new HRegionLocator(tableName, this); + if (managed) { + throw new IOException("The connection has to be unmanaged."); + } + return new HTable( + tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, getBatchPool()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java deleted file mode 100644 index fa85653..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java +++ /dev/null @@ -1,148 +0,0 @@ -/** -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NavigableMap; -import java.util.Map.Entry; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.util.Pair; - -import com.google.common.annotations.VisibleForTesting; - -/** - * An implementation of {@link RegionLocator}. Used to view region location information for a single - * HBase table. Lightweight. Get as needed and just close when done. Instances of this class SHOULD - * NOT be constructed directly. Obtain an instance via {@link Connection}. See - * {@link ConnectionFactory} class comment for an example of how. - * - * <p> This class is thread safe - */ [email protected] [email protected] -public class HRegionLocator implements RegionLocator { - - private final TableName tableName; - private final ClusterConnection connection; - - public HRegionLocator(TableName tableName, ClusterConnection connection) { - this.connection = connection; - this.tableName = tableName; - } - - /** - * {@inheritDoc} - */ - @Override - public void close() throws IOException { - // This method is required by the RegionLocator interface. This implementation does not have any - // persistent state, so there is no need to do anything here. - } - - /** - * {@inheritDoc} - */ - @Override - public HRegionLocation getRegionLocation(final byte [] row) - throws IOException { - return connection.getRegionLocation(tableName, row, false); - } - - /** - * {@inheritDoc} - */ - @Override - public HRegionLocation getRegionLocation(final byte [] row, boolean reload) - throws IOException { - return connection.getRegionLocation(tableName, row, reload); - } - - @Override - public List<HRegionLocation> getAllRegionLocations() throws IOException { - NavigableMap<HRegionInfo, ServerName> locations = - MetaScanner.allTableRegions(this.connection, getName()); - ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size()); - for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) { - regions.add(new HRegionLocation(entry.getKey(), entry.getValue())); - } - return regions; - } - - /** - * {@inheritDoc} - */ - @Override - public byte[][] getStartKeys() throws IOException { - return getStartEndKeys().getFirst(); - } - - /** - * {@inheritDoc} - */ - @Override - public byte[][] getEndKeys() throws IOException { - return getStartEndKeys().getSecond(); - } - - /** - * {@inheritDoc} - */ - @Override - public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { - return getStartEndKeys(listRegionLocations()); - } - - @VisibleForTesting - Pair<byte[][], byte[][]> getStartEndKeys(List<RegionLocations> regions) { - final byte[][] startKeyList = new byte[regions.size()][]; - final byte[][] endKeyList = new byte[regions.size()][]; - - for (int i = 0; i < regions.size(); i++) { - HRegionInfo region = regions.get(i).getRegionLocation().getRegionInfo(); - startKeyList[i] = region.getStartKey(); - endKeyList[i] = region.getEndKey(); - } - - return new Pair<>(startKeyList, endKeyList); - } - - @Override - public TableName getName() { - return this.tableName; - } - - @VisibleForTesting - List<RegionLocations> listRegionLocations() throws IOException { - return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName()); - } - - public Configuration getConfiguration() { - return connection.getConfiguration(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 13ec97d..c141b29 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; @@ -108,7 +110,7 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private @InterfaceStability.Stable -public class HTable implements HTableInterface { +public class HTable implements HTableInterface, RegionLocator { private static final Log LOG = LogFactory.getLog(HTable.class); protected ClusterConnection connection; private final TableName tableName; @@ -125,7 +127,6 @@ public class HTable implements HTableInterface { private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; - private HRegionLocator locator; /** The Async process for puts with autoflush set to false or multiputs */ protected AsyncProcess ap; @@ -366,7 +367,6 @@ public class HTable implements HTableInterface { // puts need to track errors globally due to how the APIs currently work. ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); multiAp = this.connection.getAsyncProcess(); - this.locator = new HRegionLocator(getName(), connection); this.closed = false; } @@ -478,25 +478,25 @@ public class HTable implements HTableInterface { @Deprecated public HRegionLocation getRegionLocation(final String row) throws IOException { - return getRegionLocation(Bytes.toBytes(row), false); + return connection.getRegionLocation(tableName, Bytes.toBytes(row), false); } /** - * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead. + * {@inheritDoc} */ - @Deprecated + @Override public HRegionLocation getRegionLocation(final byte [] row) throws IOException { - return locator.getRegionLocation(row); + return connection.getRegionLocation(tableName, row, false); } /** - * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead. + * {@inheritDoc} */ - @Deprecated + @Override public HRegionLocation getRegionLocation(final byte [] row, boolean reload) throws IOException { - return locator.getRegionLocation(row, reload); + return connection.getRegionLocation(tableName, row, reload); } /** @@ -602,27 +602,45 @@ public class HTable implements HTableInterface { } /** - * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; + * {@inheritDoc} */ - @Deprecated + @Override public byte [][] getStartKeys() throws IOException { - return locator.getStartKeys(); + return getStartEndKeys().getFirst(); } /** - * @deprecated Use {@link RegionLocator#getEndKeys()} instead; + * {@inheritDoc} */ - @Deprecated + @Override public byte[][] getEndKeys() throws IOException { - return locator.getEndKeys(); + return getStartEndKeys().getSecond(); } /** - * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; + * {@inheritDoc} */ - @Deprecated + @Override public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { - return locator.getStartEndKeys(); + + List<RegionLocations> regions = listRegionLocations(); + final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size()); + final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size()); + + for (RegionLocations locations : regions) { + HRegionInfo region = locations.getRegionLocation().getRegionInfo(); + startKeyList.add(region.getStartKey()); + endKeyList.add(region.getEndKey()); + } + + return new Pair<byte [][], byte [][]>( + startKeyList.toArray(new byte[startKeyList.size()][]), + endKeyList.toArray(new byte[endKeyList.size()][])); + } + + @VisibleForTesting + List<RegionLocations> listRegionLocations() throws IOException { + return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName()); } /** @@ -645,12 +663,15 @@ public class HTable implements HTableInterface { * This is mainly useful for the MapReduce integration. * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs - * - * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead; */ - @Deprecated + @Override public List<HRegionLocation> getAllRegionLocations() throws IOException { - return locator.getAllRegionLocations(); + NavigableMap<HRegionInfo, ServerName> locations = getRegionLocations(); + ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size()); + for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) { + regions.add(new HRegionLocation(entry.getKey(), entry.getValue())); + } + return regions; } /** @@ -1907,8 +1928,4 @@ public class HTable implements HTableInterface { callbackErrorServers); } } - - public RegionLocator getRegionLocator() { - return this.locator; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java index 83c9883..ab77ceb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java @@ -62,7 +62,7 @@ public class HTableUtil { */ public static void bucketRsPut(HTable htable, List<Put> puts) throws IOException { - Map<String, List<Put>> putMap = createRsPutMap(htable.getRegionLocator(), puts); + Map<String, List<Put>> putMap = createRsPutMap(htable, puts); for (List<Put> rsPuts: putMap.values()) { htable.put( rsPuts ); } @@ -92,7 +92,7 @@ public class HTableUtil { public static void bucketRsBatch(HTable htable, List<Row> rows) throws IOException { try { - Map<String, List<Row>> rowMap = createRsRowMap(htable.getRegionLocator(), rows); + Map<String, List<Row>> rowMap = createRsRowMap(htable, rows); for (List<Row> rsRows: rowMap.values()) { htable.batch( rsRows ); } http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 7c07a99..40ca4a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; public abstract class RegionServerCallable<T> implements RetryingCallable<T> { // Public because used outside of this package over in ipc. static final Log LOG = LogFactory.getLog(RegionServerCallable.class); - protected final Connection connection; + protected final HConnection connection; protected final TableName tableName; protected final byte[] row; protected HRegionLocation location; @@ -61,7 +61,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { * @param tableName Table name to which <code>row</code> belongs. * @param row The row we want in <code>tableName</code>. */ - public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { + public RegionServerCallable(HConnection connection, TableName tableName, byte [] row) { this.connection = connection; this.tableName = tableName; this.row = row; @@ -75,9 +75,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { */ @Override public void prepare(final boolean reload) throws IOException { - try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { - this.location = regionLocator.getRegionLocation(row, reload); - } + this.location = connection.getRegionLocation(tableName, row, reload); if (this.location == null) { throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + Bytes.toString(row) + ", reload=" + reload); @@ -89,7 +87,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { * @return {@link HConnection} instance used by this Callable. */ HConnection getConnection() { - return (HConnection) this.connection; + return this.connection; } protected ClientService.BlockingInterface getStub() { http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 61746b5..8e532e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -56,6 +56,7 @@ import java.rmi.UnknownHostException; import java.util.List; import java.util.ArrayList; +import javax.annotation.Nullable; /** * Utility class to perform operation (get/wait for/verify/set/delete) on znode in ZooKeeper @@ -126,6 +127,7 @@ public class MetaTableLocator { * @param zkw zookeeper connection to use * @return server name or null if we failed to get the data. */ + @Nullable public ServerName getMetaRegionLocation(final ZooKeeperWatcher zkw) { try { RegionState state = getMetaRegionState(zkw); http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index 8773d0a..0ad65c3 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -18,10 +18,17 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertEquals; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Joiner; - import org.apache.commons.cli.CommandLine; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; @@ -35,23 +42,20 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; -import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -75,15 +79,7 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import static org.junit.Assert.assertEquals; /** * Test Bulk Load and MR on a distributed cluster. @@ -251,6 +247,7 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { EnvironmentEdgeManager.currentTime(); Configuration conf = new Configuration(util.getConfiguration()); Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); + HTable table = new HTable(conf, getTablename()); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); @@ -276,23 +273,18 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { // Set where to place the hfiles. FileOutputFormat.setOutputPath(job, p); - try (Connection conn = ConnectionFactory.createConnection(conf); - Admin admin = conn.getAdmin(); - Table table = conn.getTable(getTablename()); - RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { - - // Configure the partitioner and other things needed for HFileOutputFormat. - HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); - // Run the job making sure it works. - assertEquals(true, job.waitForCompletion(true)); + // Configure the partitioner and other things needed for HFileOutputFormat. + HFileOutputFormat2.configureIncrementalLoad(job, table, table); - // Create a new loader. - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + // Run the job making sure it works. + assertEquals(true, job.waitForCompletion(true)); - // Load the HFiles in. - loader.doBulkLoad(p, admin, table, regionLocator); - } + // Create a new loader. + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + + // Load the HFiles in. + loader.doBulkLoad(p, table); // Delete the files. util.getTestFileSystem().delete(p, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 6d6feb1..402381b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Table; @@ -88,8 +87,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), - table.getRegionLocator()); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); } /** @@ -152,8 +150,20 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, HFileOutputFormat2.configurePartitioner(job, splitPoints); } + /** + * Serialize column family to compression algorithm map to configuration. + * Invoked while configuring the MR job for incremental load. + * + * @param table to read the properties from + * @param conf to persist serialized values into + * @throws IOException + * on failure to read column family descriptors + */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings( + value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") + @VisibleForTesting static void configureCompression(Table table, Configuration conf) throws IOException { - HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor()); + HFileOutputFormat2.configureCompression(table, conf); } /** @@ -167,7 +177,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, */ @VisibleForTesting static void configureBlockSize(Table table, Configuration conf) throws IOException { - HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf); + HFileOutputFormat2.configureBlockSize(table, conf); } /** @@ -181,7 +191,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, */ @VisibleForTesting static void configureBloomType(Table table, Configuration conf) throws IOException { - HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf); + HFileOutputFormat2.configureBloomType(table, conf); } /** @@ -196,7 +206,6 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, @VisibleForTesting static void configureDataBlockEncoding(Table table, Configuration conf) throws IOException { - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf); + HFileOutputFormat2.configureDataBlockEncoding(table, conf); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 513bf40..a4d2425 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -80,7 +80,7 @@ import com.google.common.annotations.VisibleForTesting; * all HFiles being written. * <p> * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, HTableDescriptor, RegionLocator, Class)}. + * using {@link #configureIncrementalLoad(Job, Table, RegionLocator, Class)}. */ @InterfaceAudience.Public @InterfaceStability.Evolving @@ -364,7 +364,7 @@ public class HFileOutputFormat2 @Deprecated public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator()); + configureIncrementalLoad(job, table, table); } /** @@ -383,32 +383,13 @@ public class HFileOutputFormat2 */ public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class); } - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - */ - public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, - RegionLocator regionLocator) throws IOException { - configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class); - } - - static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, - RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException, - UnsupportedEncodingException { + static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator, + Class<? extends OutputFormat<?, ?>> cls) throws IOException { Configuration conf = job.getConfiguration(); + job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(cls); @@ -431,7 +412,7 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + regionLocator.getName()); + LOG.info("Looking up current regions for table " + table.getName()); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); @@ -439,14 +420,14 @@ public class HFileOutputFormat2 configurePartitioner(job, startKeys); // Set compression algorithms based on column families - configureCompression(conf, tableDescriptor); - configureBloomType(tableDescriptor, conf); - configureBlockSize(tableDescriptor, conf); - configureDataBlockEncoding(tableDescriptor, conf); + configureCompression(table, conf); + configureBloomType(table, conf); + configureBlockSize(table, conf); + configureDataBlockEncoding(table, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + regionLocator.getName() + " output configured."); + LOG.info("Incremental table " + table.getName() + " output configured."); } public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { @@ -457,11 +438,10 @@ public class HFileOutputFormat2 job.setOutputFormatClass(HFileOutputFormat2.class); // Set compression algorithms based on column families - configureCompression(conf, table.getTableDescriptor()); - configureBloomType(table.getTableDescriptor(), conf); - configureBlockSize(table.getTableDescriptor(), conf); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - configureDataBlockEncoding(tableDescriptor, conf); + configureCompression(table, conf); + configureBloomType(table, conf); + configureBlockSize(table, conf); + configureDataBlockEncoding(table, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); @@ -610,9 +590,10 @@ public class HFileOutputFormat2 @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") @VisibleForTesting - static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor) - throws UnsupportedEncodingException { + static void configureCompression( + Table table, Configuration conf) throws IOException { StringBuilder compressionConfigValue = new StringBuilder(); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); if(tableDescriptor == null){ // could happen with mock table instance return; @@ -636,16 +617,17 @@ public class HFileOutputFormat2 /** * Serialize column family to block size map to configuration. * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into * + * @param table to read the properties from + * @param conf to persist serialized values into * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { + static void configureBlockSize( + Table table, Configuration conf) throws IOException { StringBuilder blockSizeConfigValue = new StringBuilder(); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; @@ -669,15 +651,16 @@ public class HFileOutputFormat2 /** * Serialize column family to bloom type map to configuration. * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into * + * @param table to read the properties from + * @param conf to persist serialized values into * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { + static void configureBloomType( + Table table, Configuration conf) throws IOException { + HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; @@ -711,8 +694,9 @@ public class HFileOutputFormat2 * on failure to read column family descriptors */ @VisibleForTesting - static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, - Configuration conf) throws UnsupportedEncodingException { + static void configureDataBlockEncoding(Table table, + Configuration conf) throws IOException { + HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 6b4dc24..399d607 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -18,8 +18,20 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -29,18 +41,13 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -53,16 +60,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.zookeeper.KeeperException; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - /** * Import data written by {@link Export}. @@ -444,18 +441,15 @@ public class Import { if (hfileOutPath != null) { job.setMapperClass(KeyValueImporter.class); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)){ - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - com.google.common.base.Preconditions.class); - } + HTable table = new HTable(conf, tableName); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); } else { // No reducers. Just write straight to table. Call initTableReducerJob // because it sets up the TableOutputFormat. http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 90f2f0e..54e0034 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -20,13 +20,17 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -36,14 +40,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; @@ -58,11 +59,9 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; /** * Tool to import data from a TSV file. @@ -497,8 +496,7 @@ public class ImportTsv extends Configured implements Tool { throw new TableNotFoundException(errorMsg); } } - try (Table table = connection.getTable(tableName); - RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + try (HTable table = (HTable)connection.getTable(tableName)) { boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false); // if no.strict is false then check column family if(!noStrict) { @@ -536,8 +534,7 @@ public class ImportTsv extends Configured implements Tool { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } - HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), - regionLocator); + HFileOutputFormat2.configureIncrementalLoad(job, table, table); } } else { if (!admin.tableExists(tableName)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 6a9b52e..b4b6adc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -20,21 +20,42 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -43,14 +64,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Table; @@ -78,30 +95,12 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * Tool to load the output of HFileOutputFormat into an existing table. @@ -236,24 +235,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException, IOException { - doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator()); - } - - /** - * Perform a bulk load of the given directory into the given - * pre-existing table. This method is not threadsafe. - * - * @param hfofDir the directory that was provided as the output path - * of a job using HFileOutputFormat - * @param table the table to load into - * @throws TableNotFoundException if table does not yet exist - */ - @SuppressWarnings("deprecation") - public void doBulkLoad(Path hfofDir, final Admin admin, Table table, - RegionLocator regionLocator) throws TableNotFoundException, IOException { + final HConnection conn = table.getConnection(); - if (!admin.isTableAvailable(regionLocator.getName())) { - throw new TableNotFoundException("Table " + table.getName() + "is not currently available."); + if (!conn.isTableAvailable(table.getName())) { + throw new TableNotFoundException("Table " + + Bytes.toStringBinary(table.getTableName()) + + "is not currently available."); } // initialize thread pools @@ -289,7 +276,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String msg = "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + unmatchedFamilies + "; valid family names of table " - + table.getName() + " are: " + familyNames; + + Bytes.toString(table.getTableName()) + " are: " + familyNames; LOG.error(msg); throw new IOException(msg); } @@ -313,7 +300,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { // need to reload split keys each iteration. - final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); + final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys(); if (count != 0) { LOG.info("Split occured while grouping HFiles, retry attempt " + + count + " with " + queue.size() + " files remaining to group or split"); @@ -336,7 +323,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + " hfiles to one family of one region"); } - bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); + bulkLoadPhase(table, conn, pool, queue, regionGroups); // NOTE: The next iteration's split / group could happen in parallel to // atomic bulkloads assuming that there are splits and no merges, and @@ -372,7 +359,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * them. Any failures are re-queued for another pass with the * groupOrSplitPhase. */ - protected void bulkLoadPhase(final Table table, final Connection conn, + protected void bulkLoadPhase(final Table table, final HConnection conn, ExecutorService pool, Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException { // atomically bulk load the groups. @@ -444,7 +431,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely * bulk load region targets. */ - private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table, + private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table, ExecutorService pool, Deque<LoadQueueItem> queue, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { // <region start key, LQI> need synchronized only within this scope of this @@ -537,7 +524,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @throws IOException */ protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, - final LoadQueueItem item, final Table table, + final LoadQueueItem item, final HTable table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; @@ -582,18 +569,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ if (indexForCallable < 0) { throw new IOException("The first region info for table " - + table.getName() + + Bytes.toString(table.getTableName()) + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { throw new IOException("The last region info for table " - + table.getName() + + Bytes.toString(table.getTableName()) + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if (indexForCallable + 1 < startEndKeys.getFirst().length && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { throw new IOException("The endkey of one region for table " - + table.getName() + + Bytes.toString(table.getTableName()) + " is not equal to the startkey of the next region in hbase:meta." + "Please use hbck tool to fix it first."); } @@ -614,7 +601,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } /** - * @deprecated Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)} + * @deprecated Use {@link #tryAtomicRegionLoad(HConnection, TableName, byte[], Collection)} */ @Deprecated protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn, @@ -636,7 +623,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return empty list if success, list of items to retry on recoverable * failure */ - protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, + protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn, final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) throws IOException { final List<Pair<byte[], String>> famPaths = http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index 890cfdd..b9a2db7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -155,10 +155,14 @@ public abstract class MultiTableInputFormatBase extends Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); TableName tableName = entry.getKey(); List<Scan> scanList = entry.getValue(); + Table table = null; + RegionLocator regionLocator = null; + Connection conn = null; - try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + try{ + conn = ConnectionFactory.createConnection(context.getConfiguration()); + table = conn.getTable(tableName); + regionLocator = (RegionLocator) table; RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( regionLocator, conn.getAdmin()); Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); @@ -206,6 +210,10 @@ public abstract class MultiTableInputFormatBase extends } } } + } finally { + if (null != table) table.close(); + if (null != regionLocator) regionLocator.close(); + if (null != conn) conn.close(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 3bf001b..4123467 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -355,7 +355,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { @Deprecated protected void setHTable(HTable table) throws IOException { this.table = table; - this.regionLocator = table.getRegionLocator(); + this.regionLocator = table; this.admin = table.getConnection().getAdmin(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index fb888aa..a487878 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -17,8 +17,16 @@ */ package org.apache.hadoop.hbase.mapreduce; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.TreeMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -28,19 +36,14 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -49,12 +52,6 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Map; -import java.util.TreeMap; - /** * A tool to replay WAL files as a M/R job. * The WAL can be replayed for a set of tables or all tables, @@ -260,17 +257,13 @@ public class WALPlayer extends Configured implements Tool { if (tables.length != 1) { throw new IOException("Exactly one table must be specified for the bulk export option"); } - TableName tableName = TableName.valueOf(tables[0]); + HTable table = new HTable(conf, TableName.valueOf(tables[0])); job.setMapperClass(WALKeyValueMapper.class); job.setReducerClass(KeyValueSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); - } + HFileOutputFormat2.configureIncrementalLoad(job, table, table); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java index 4f7c0a5..92c4410 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java @@ -68,7 +68,7 @@ public class RegionSizeCalculator { public RegionSizeCalculator(HTable table) throws IOException { HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); try { - init(table.getRegionLocator(), admin); + init(table, admin); } finally { admin.close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 144f6e4..d7d4a61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; @@ -121,7 +120,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.ipc.RemoteException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -1818,29 +1816,17 @@ public class WALSplitter { RegionServerWriter rsw = null; long startTime = System.nanoTime(); - boolean shouldRetry = true; - int retryCount = 0; - while (shouldRetry) { - try { - rsw = getRegionServerWriter(key); - rsw.sink.replayEntries(actions); + try { + rsw = getRegionServerWriter(key); + rsw.sink.replayEntries(actions); - // Pass along summary statistics - rsw.incrementEdits(actions.size()); - rsw.incrementNanoTime(System.nanoTime() - startTime); - shouldRetry = false; - } catch (IOException e) { - e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; - LOG.fatal("Got while writing log entry to log", e); - if (e instanceof RetriesExhaustedException && - ((RetriesExhaustedException)e).getMessage().contains("RegionTooBusyException")) { - // keep retrying - retryCount++; - LOG.warn("replayEntries retry count " + retryCount); - continue; - } - throw e; - } + // Pass along summary statistics + rsw.incrementEdits(actions.size()); + rsw.incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.fatal(" Got while writing log entry to log", e); + throw e; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e5d17af..a7ac01d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2130,55 +2130,56 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { final byte[] columnFamily, byte [][] startKeys) throws IOException { Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); - try (Table meta = new HTable(c, TableName.META_TABLE_NAME)) { - HTableDescriptor htd = table.getTableDescriptor(); - if(!htd.hasFamily(columnFamily)) { - HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); - htd.addFamily(hcd); - } - // remove empty region - this is tricky as the mini cluster during the test - // setup already has the "<tablename>,,123456789" row with an empty start - // and end key. Adding the custom regions below adds those blindly, - // including the new start region from empty to "bbb". lg - List<byte[]> rows = getMetaTableRows(htd.getTableName()); - String regionToDeleteInFS = table - .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0) - .getRegionInfo().getEncodedName(); - List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length); - // add custom ones - int count = 0; - for (int i = 0; i < startKeys.length; i++) { - int j = (i + 1) % startKeys.length; - HRegionInfo hri = new HRegionInfo(table.getName(), - startKeys[i], startKeys[j]); - MetaTableAccessor.addRegionToMeta(meta, hri); - newRegions.add(hri); - count++; - } - // see comment above, remove "old" (or previous) single region - for (byte[] row : rows) { - LOG.info("createMultiRegions: deleting meta row -> " + - Bytes.toStringBinary(row)); - meta.delete(new Delete(row)); - } - // remove the "old" region from FS - Path tableDir = new Path(getDefaultRootDirPath().toString() - + System.getProperty("file.separator") + htd.getTableName() - + System.getProperty("file.separator") + regionToDeleteInFS); - FileSystem.get(c).delete(tableDir, true); - // flush cache of regions - HConnection conn = table.getConnection(); - conn.clearRegionCache(); - // assign all the new regions IF table is enabled. - Admin admin = conn.getAdmin(); - if (admin.isTableEnabled(table.getName())) { - for(HRegionInfo hri : newRegions) { - admin.assign(hri.getRegionName()); - } + Table meta = new HTable(c, TableName.META_TABLE_NAME); + HTableDescriptor htd = table.getTableDescriptor(); + if(!htd.hasFamily(columnFamily)) { + HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); + htd.addFamily(hcd); + } + // remove empty region - this is tricky as the mini cluster during the test + // setup already has the "<tablename>,,123456789" row with an empty start + // and end key. Adding the custom regions below adds those blindly, + // including the new start region from empty to "bbb". lg + List<byte[]> rows = getMetaTableRows(htd.getTableName()); + String regionToDeleteInFS = table + .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0) + .getRegionInfo().getEncodedName(); + List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length); + // add custom ones + int count = 0; + for (int i = 0; i < startKeys.length; i++) { + int j = (i + 1) % startKeys.length; + HRegionInfo hri = new HRegionInfo(table.getName(), + startKeys[i], startKeys[j]); + MetaTableAccessor.addRegionToMeta(meta, hri); + newRegions.add(hri); + count++; + } + // see comment above, remove "old" (or previous) single region + for (byte[] row : rows) { + LOG.info("createMultiRegions: deleting meta row -> " + + Bytes.toStringBinary(row)); + meta.delete(new Delete(row)); + } + // remove the "old" region from FS + Path tableDir = new Path(getDefaultRootDirPath().toString() + + System.getProperty("file.separator") + htd.getTableName() + + System.getProperty("file.separator") + regionToDeleteInFS); + FileSystem.get(c).delete(tableDir, true); + // flush cache of regions + HConnection conn = table.getConnection(); + conn.clearRegionCache(); + // assign all the new regions IF table is enabled. + Admin admin = getHBaseAdmin(); + if (admin.isTableEnabled(table.getName())) { + for(HRegionInfo hri : newRegions) { + admin.assign(hri.getRegionName()); } - - return count; } + + meta.close(); + + return count; } /** @@ -3450,10 +3451,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } public static int getMetaRSPort(Configuration conf) throws IOException { - try (Connection c = ConnectionFactory.createConnection(); - RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) { - return locator.getRegionLocation(Bytes.toBytes("")).getPort(); - } + RegionLocator table = new HTable(conf, TableName.META_TABLE_NAME); + HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes("")); + table.close(); + return hloc.getPort(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java index 5a5005a..1f83201 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java @@ -26,6 +26,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -66,7 +68,7 @@ public class TestRegionRebalancing { private static final byte[] FAMILY_NAME = Bytes.toBytes("col"); public static final Log LOG = LogFactory.getLog(TestRegionRebalancing.class); private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private RegionLocator regionLocator; + private RegionLocator table; private HTableDescriptor desc; private String balancerName; @@ -98,59 +100,59 @@ public class TestRegionRebalancing { @SuppressWarnings("deprecation") public void testRebalanceOnRegionServerNumberChange() throws IOException, InterruptedException { - try(Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); - Admin admin = connection.getAdmin()) { - admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS, - 1, HBaseTestingUtility.KEYS.length)); - this.regionLocator = connection.getRegionLocator(this.desc.getTableName()); - - MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection()); - - assertEquals("Test table should have right number of regions", - HBaseTestingUtility.KEYS.length, - this.regionLocator.getStartKeys().length); - - // verify that the region assignments are balanced to start out - assertRegionsAreBalanced(); - - // add a region server - total of 2 - LOG.info("Started second server=" + + Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Admin admin = connection.getAdmin(); + admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS, + 1, HBaseTestingUtility.KEYS.length)); + this.table = new HTable(UTIL.getConfiguration(), this.desc.getTableName()); + + MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection()); + + assertEquals("Test table should have right number of regions", + HBaseTestingUtility.KEYS.length, + this.table.getStartKeys().length); + + // verify that the region assignments are balanced to start out + assertRegionsAreBalanced(); + + // add a region server - total of 2 + LOG.info("Started second server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + UTIL.getHBaseCluster().getMaster().balance(); + assertRegionsAreBalanced(); + + // On a balanced cluster, calling balance() should return true + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + + // if we add a server, then the balance() call should return true + // add a region server - total of 3 + LOG.info("Started third server=" + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - UTIL.getHBaseCluster().getMaster().balance(); - assertRegionsAreBalanced(); - - // On a balanced cluster, calling balance() should return true - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - - // if we add a server, then the balance() call should return true - // add a region server - total of 3 - LOG.info("Started third server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - - // kill a region server - total of 2 - LOG.info("Stopped third server=" + UTIL.getHBaseCluster().stopRegionServer(2, false)); - UTIL.getHBaseCluster().waitOnRegionServer(2); - UTIL.getHBaseCluster().getMaster().balance(); - assertRegionsAreBalanced(); - - // start two more region servers - total of 4 - LOG.info("Readding third server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - LOG.info("Added fourth server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - - for (int i = 0; i < 6; i++){ - LOG.info("Adding " + (i + 5) + "th region server"); - UTIL.getHBaseCluster().startRegionServer(); - } - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - regionLocator.close(); + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + + // kill a region server - total of 2 + LOG.info("Stopped third server=" + UTIL.getHBaseCluster().stopRegionServer(2, false)); + UTIL.getHBaseCluster().waitOnRegionServer(2); + UTIL.getHBaseCluster().getMaster().balance(); + assertRegionsAreBalanced(); + + // start two more region servers - total of 4 + LOG.info("Readding third server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + LOG.info("Added fourth server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + + for (int i = 0; i < 6; i++){ + LOG.info("Adding " + (i + 5) + "th region server"); + UTIL.getHBaseCluster().startRegionServer(); } + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + table.close(); + admin.close(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/173eba81/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index cfa2a38..8511f88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -26,6 +26,24 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -89,22 +107,6 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; - /** * Run tests that use the HBase clients; {@link HTable}. * Sets up the HBase mini cluster once at start and runs through all client tests. @@ -5201,41 +5203,40 @@ public class TestFromClientSide { TableName TABLE = TableName.valueOf("testNonCachedGetRegionLocation"); byte [] family1 = Bytes.toBytes("f1"); byte [] family2 = Bytes.toBytes("f2"); - try (HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10); - Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration())) { - Map <HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); - assertEquals(1, regionsMap.size()); - HRegionInfo regionInfo = regionsMap.keySet().iterator().next(); - ServerName addrBefore = regionsMap.get(regionInfo); - // Verify region location before move. - HRegionLocation addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); - HRegionLocation addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); - - assertEquals(addrBefore.getPort(), addrCache.getPort()); - assertEquals(addrBefore.getPort(), addrNoCache.getPort()); - - ServerName addrAfter = null; - // Now move the region to a different server. - for (int i = 0; i < SLAVES; i++) { - HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(i); - ServerName addr = regionServer.getServerName(); - if (addr.getPort() != addrBefore.getPort()) { - admin.move(regionInfo.getEncodedNameAsBytes(), - Bytes.toBytes(addr.toString())); - // Wait for the region to move. - Thread.sleep(5000); - addrAfter = addr; - break; - } + HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10); + Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); + Map <HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); + assertEquals(1, regionsMap.size()); + HRegionInfo regionInfo = regionsMap.keySet().iterator().next(); + ServerName addrBefore = regionsMap.get(regionInfo); + // Verify region location before move. + HRegionLocation addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); + HRegionLocation addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); + + assertEquals(addrBefore.getPort(), addrCache.getPort()); + assertEquals(addrBefore.getPort(), addrNoCache.getPort()); + + ServerName addrAfter = null; + // Now move the region to a different server. + for (int i = 0; i < SLAVES; i++) { + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(i); + ServerName addr = regionServer.getServerName(); + if (addr.getPort() != addrBefore.getPort()) { + admin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(addr.toString())); + // Wait for the region to move. + Thread.sleep(5000); + addrAfter = addr; + break; } - - // Verify the region was moved. - addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); - addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); - assertNotNull(addrAfter); - assertTrue(addrAfter.getPort() != addrCache.getPort()); - assertEquals(addrAfter.getPort(), addrNoCache.getPort()); } + + // Verify the region was moved. + addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); + addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); + assertNotNull(addrAfter); + assertTrue(addrAfter.getPort() != addrCache.getPort()); + assertEquals(addrAfter.getPort(), addrNoCache.getPort()); } @Test @@ -6246,13 +6247,10 @@ public class TestFromClientSide { HColumnDescriptor fam = new HColumnDescriptor(FAMILY); htd.addFamily(fam); byte[][] KEYS = HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE; - HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); - admin.createTable(htd, KEYS); - List<HRegionInfo> regions = admin.getTableRegions(htd.getTableName()); + TEST_UTIL.getHBaseAdmin().createTable(htd, KEYS); + List<HRegionInfo> regions = TEST_UTIL.getHBaseAdmin().getTableRegions(htd.getTableName()); - HRegionLocator locator = - (HRegionLocator) admin.getConnection().getRegionLocator(htd.getTableName()); - for (int regionReplication = 1; regionReplication < 4; regionReplication++) { + for (int regionReplication = 1; regionReplication < 4 ; regionReplication++) { List<RegionLocations> regionLocations = new ArrayList<RegionLocations>(); // mock region locations coming from meta with multiple replicas @@ -6264,7 +6262,10 @@ public class TestFromClientSide { regionLocations.add(new RegionLocations(arr)); } - Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys(regionLocations); + HTable table = spy(new HTable(TEST_UTIL.getConfiguration(), htd.getTableName())); + when(table.listRegionLocations()).thenReturn(regionLocations); + + Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys(); assertEquals(KEYS.length + 1, startEndKeys.getFirst().length); @@ -6274,6 +6275,9 @@ public class TestFromClientSide { assertArrayEquals(startKey, startEndKeys.getFirst()[i]); assertArrayEquals(endKey, startEndKeys.getSecond()[i]); } + + table.close(); } } + }
