HBASE-12783 Create efficient RegionLocator implementation (Solomon)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2c4fe03e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2c4fe03e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2c4fe03e Branch: refs/heads/branch-1 Commit: 2c4fe03e6a4f2868d43a055f475a7a2d48f51b11 Parents: 46f993b Author: tedyu <[email protected]> Authored: Wed Jan 7 11:10:24 2015 -0800 Committer: tedyu <[email protected]> Committed: Wed Jan 7 11:10:24 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ConnectionManager.java | 6 +- .../hadoop/hbase/client/HRegionLocator.java | 148 +++++++++++++ .../org/apache/hadoop/hbase/client/HTable.java | 72 +++---- .../apache/hadoop/hbase/client/HTableUtil.java | 4 +- .../hbase/client/RegionServerCallable.java | 10 +- .../hadoop/hbase/client/RpcRetryingCaller.java | 2 +- .../hbase/zookeeper/MetaTableLocator.java | 1 - .../mapreduce/IntegrationTestBulkLoad.java | 52 +++-- .../hbase/mapreduce/HFileOutputFormat.java | 25 +-- .../hbase/mapreduce/HFileOutputFormat2.java | 78 ++++--- .../apache/hadoop/hbase/mapreduce/Import.java | 28 ++- .../hadoop/hbase/mapreduce/ImportTsv.java | 29 +-- .../hbase/mapreduce/LoadIncrementalHFiles.java | 105 +++++---- .../mapreduce/MultiTableInputFormatBase.java | 14 +- .../hbase/mapreduce/TableInputFormatBase.java | 2 +- .../hadoop/hbase/mapreduce/WALPlayer.java | 31 +-- .../hadoop/hbase/util/RegionSizeCalculator.java | 2 +- .../hadoop/hbase/HBaseTestingUtility.java | 103 +++++---- .../hadoop/hbase/TestRegionRebalancing.java | 106 +++++---- .../hbase/client/HConnectionTestingUtility.java | 20 ++ .../hadoop/hbase/client/TestFromClientSide.java | 116 +++++----- .../hbase/mapreduce/TestHFileOutputFormat.java | 21 +- .../hbase/mapreduce/TestHFileOutputFormat2.java | 145 +++++++------ .../TestLoadIncrementalHFilesSplitRecovery.java | 12 +- .../mapreduce/TestMultiTableInputFormat.java | 11 +- .../regionserver/TestServerCustomProtocol.java | 216 ++++++++++--------- 26 files changed, 786 insertions(+), 573 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 88e045b..7bfa972 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -706,11 +706,7 @@ class ConnectionManager { @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { - if (managed) { - throw new IOException("The connection has to be unmanaged."); - } - return new HTable( - tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, getBatchPool()); + return new HRegionLocator(tableName, this); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java new file mode 100644 index 0000000..fa85653 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java @@ -0,0 +1,148 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.annotations.VisibleForTesting; + +/** + * An implementation of {@link RegionLocator}. Used to view region location information for a single + * HBase table. Lightweight. Get as needed and just close when done. Instances of this class SHOULD + * NOT be constructed directly. Obtain an instance via {@link Connection}. See + * {@link ConnectionFactory} class comment for an example of how. + * + * <p> This class is thread safe + */ [email protected] [email protected] +public class HRegionLocator implements RegionLocator { + + private final TableName tableName; + private final ClusterConnection connection; + + public HRegionLocator(TableName tableName, ClusterConnection connection) { + this.connection = connection; + this.tableName = tableName; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + // This method is required by the RegionLocator interface. This implementation does not have any + // persistent state, so there is no need to do anything here. + } + + /** + * {@inheritDoc} + */ + @Override + public HRegionLocation getRegionLocation(final byte [] row) + throws IOException { + return connection.getRegionLocation(tableName, row, false); + } + + /** + * {@inheritDoc} + */ + @Override + public HRegionLocation getRegionLocation(final byte [] row, boolean reload) + throws IOException { + return connection.getRegionLocation(tableName, row, reload); + } + + @Override + public List<HRegionLocation> getAllRegionLocations() throws IOException { + NavigableMap<HRegionInfo, ServerName> locations = + MetaScanner.allTableRegions(this.connection, getName()); + ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size()); + for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) { + regions.add(new HRegionLocation(entry.getKey(), entry.getValue())); + } + return regions; + } + + /** + * {@inheritDoc} + */ + @Override + public byte[][] getStartKeys() throws IOException { + return getStartEndKeys().getFirst(); + } + + /** + * {@inheritDoc} + */ + @Override + public byte[][] getEndKeys() throws IOException { + return getStartEndKeys().getSecond(); + } + + /** + * {@inheritDoc} + */ + @Override + public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException { + return getStartEndKeys(listRegionLocations()); + } + + @VisibleForTesting + Pair<byte[][], byte[][]> getStartEndKeys(List<RegionLocations> regions) { + final byte[][] startKeyList = new byte[regions.size()][]; + final byte[][] endKeyList = new byte[regions.size()][]; + + for (int i = 0; i < regions.size(); i++) { + HRegionInfo region = regions.get(i).getRegionLocation().getRegionInfo(); + startKeyList[i] = region.getStartKey(); + endKeyList[i] = region.getEndKey(); + } + + return new Pair<>(startKeyList, endKeyList); + } + + @Override + public TableName getName() { + return this.tableName; + } + + @VisibleForTesting + List<RegionLocations> listRegionLocations() throws IOException { + return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName()); + } + + public Configuration getConfiguration() { + return connection.getConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index c141b29..43ace95 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -48,7 +47,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; @@ -110,7 +108,7 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private @InterfaceStability.Stable -public class HTable implements HTableInterface, RegionLocator { +public class HTable implements HTableInterface { private static final Log LOG = LogFactory.getLog(HTable.class); protected ClusterConnection connection; private final TableName tableName; @@ -127,6 +125,7 @@ public class HTable implements HTableInterface, RegionLocator { private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; + private HRegionLocator locator; /** The Async process for puts with autoflush set to false or multiputs */ protected AsyncProcess ap; @@ -369,6 +368,8 @@ public class HTable implements HTableInterface, RegionLocator { multiAp = this.connection.getAsyncProcess(); this.closed = false; + + this.locator = new HRegionLocator(tableName, connection); } /** @@ -478,25 +479,25 @@ public class HTable implements HTableInterface, RegionLocator { @Deprecated public HRegionLocation getRegionLocation(final String row) throws IOException { - return connection.getRegionLocation(tableName, Bytes.toBytes(row), false); + return getRegionLocation(Bytes.toBytes(row), false); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead. */ - @Override + @Deprecated public HRegionLocation getRegionLocation(final byte [] row) throws IOException { - return connection.getRegionLocation(tableName, row, false); + return locator.getRegionLocation(row); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead. */ - @Override + @Deprecated public HRegionLocation getRegionLocation(final byte [] row, boolean reload) throws IOException { - return connection.getRegionLocation(tableName, row, reload); + return locator.getRegionLocation(row, reload); } /** @@ -602,45 +603,27 @@ public class HTable implements HTableInterface, RegionLocator { } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; */ - @Override + @Deprecated public byte [][] getStartKeys() throws IOException { - return getStartEndKeys().getFirst(); + return locator.getStartKeys(); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getEndKeys()} instead; */ - @Override + @Deprecated public byte[][] getEndKeys() throws IOException { - return getStartEndKeys().getSecond(); + return locator.getEndKeys(); } /** - * {@inheritDoc} + * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; */ - @Override + @Deprecated public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException { - - List<RegionLocations> regions = listRegionLocations(); - final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size()); - final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size()); - - for (RegionLocations locations : regions) { - HRegionInfo region = locations.getRegionLocation().getRegionInfo(); - startKeyList.add(region.getStartKey()); - endKeyList.add(region.getEndKey()); - } - - return new Pair<byte [][], byte [][]>( - startKeyList.toArray(new byte[startKeyList.size()][]), - endKeyList.toArray(new byte[endKeyList.size()][])); - } - - @VisibleForTesting - List<RegionLocations> listRegionLocations() throws IOException { - return MetaScanner.listTableRegionLocations(getConfiguration(), this.connection, getName()); + return locator.getStartEndKeys(); } /** @@ -663,15 +646,12 @@ public class HTable implements HTableInterface, RegionLocator { * This is mainly useful for the MapReduce integration. * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs + * + * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead; */ - @Override + @Deprecated public List<HRegionLocation> getAllRegionLocations() throws IOException { - NavigableMap<HRegionInfo, ServerName> locations = getRegionLocations(); - ArrayList<HRegionLocation> regions = new ArrayList<>(locations.size()); - for (Entry<HRegionInfo, ServerName> entry : locations.entrySet()) { - regions.add(new HRegionLocation(entry.getKey(), entry.getValue())); - } - return regions; + return locator.getAllRegionLocations(); } /** @@ -1928,4 +1908,8 @@ public class HTable implements HTableInterface, RegionLocator { callbackErrorServers); } } + + public RegionLocator getRegionLocator() { + return this.locator; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java index ab77ceb..83c9883 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java @@ -62,7 +62,7 @@ public class HTableUtil { */ public static void bucketRsPut(HTable htable, List<Put> puts) throws IOException { - Map<String, List<Put>> putMap = createRsPutMap(htable, puts); + Map<String, List<Put>> putMap = createRsPutMap(htable.getRegionLocator(), puts); for (List<Put> rsPuts: putMap.values()) { htable.put( rsPuts ); } @@ -92,7 +92,7 @@ public class HTableUtil { public static void bucketRsBatch(HTable htable, List<Row> rows) throws IOException { try { - Map<String, List<Row>> rowMap = createRsRowMap(htable, rows); + Map<String, List<Row>> rowMap = createRsRowMap(htable.getRegionLocator(), rows); for (List<Row> rsRows: rowMap.values()) { htable.batch( rsRows ); } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 40ca4a4..7c07a99 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.Bytes; public abstract class RegionServerCallable<T> implements RetryingCallable<T> { // Public because used outside of this package over in ipc. static final Log LOG = LogFactory.getLog(RegionServerCallable.class); - protected final HConnection connection; + protected final Connection connection; protected final TableName tableName; protected final byte[] row; protected HRegionLocation location; @@ -61,7 +61,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { * @param tableName Table name to which <code>row</code> belongs. * @param row The row we want in <code>tableName</code>. */ - public RegionServerCallable(HConnection connection, TableName tableName, byte [] row) { + public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { this.connection = connection; this.tableName = tableName; this.row = row; @@ -75,7 +75,9 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { */ @Override public void prepare(final boolean reload) throws IOException { - this.location = connection.getRegionLocation(tableName, row, reload); + try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + this.location = regionLocator.getRegionLocation(row, reload); + } if (this.location == null) { throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + Bytes.toString(row) + ", reload=" + reload); @@ -87,7 +89,7 @@ public abstract class RegionServerCallable<T> implements RetryingCallable<T> { * @return {@link HConnection} instance used by this Callable. */ HConnection getConnection() { - return this.connection; + return (HConnection) this.connection; } protected ClientService.BlockingInterface getStub() { http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 49c7efd..b2020bd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -98,8 +98,8 @@ public class RpcRetryingCaller<T> { } public void cancel(){ - cancelled.set(true); synchronized (cancelled){ + cancelled.set(true); cancelled.notifyAll(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java index 8e532e5..6bc5acc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -127,7 +127,6 @@ public class MetaTableLocator { * @param zkw zookeeper connection to use * @return server name or null if we failed to get the data. */ - @Nullable public ServerName getMetaRegionLocation(final ZooKeeperWatcher zkw) { try { RegionState state = getMetaRegionState(zkw); http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java index 0ad65c3..8773d0a 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java @@ -18,17 +18,10 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import static org.junit.Assert.assertEquals; import com.google.common.base.Joiner; + import org.apache.commons.cli.CommandLine; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; @@ -42,20 +35,23 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.RegionSplitter; @@ -79,7 +75,15 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; /** * Test Bulk Load and MR on a distributed cluster. @@ -247,7 +251,6 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { EnvironmentEdgeManager.currentTime(); Configuration conf = new Configuration(util.getConfiguration()); Path p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration); - HTable table = new HTable(conf, getTablename()); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); @@ -273,18 +276,23 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase { // Set where to place the hfiles. FileOutputFormat.setOutputPath(job, p); + try (Connection conn = ConnectionFactory.createConnection(conf); + Admin admin = conn.getAdmin(); + Table table = conn.getTable(getTablename()); + RegionLocator regionLocator = conn.getRegionLocator(getTablename())) { + + // Configure the partitioner and other things needed for HFileOutputFormat. + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); - // Configure the partitioner and other things needed for HFileOutputFormat. - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + // Run the job making sure it works. + assertEquals(true, job.waitForCompletion(true)); - // Run the job making sure it works. - assertEquals(true, job.waitForCompletion(true)); - - // Create a new loader. - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); + // Create a new loader. + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - // Load the HFiles in. - loader.doBulkLoad(p, table); + // Load the HFiles in. + loader.doBulkLoad(p, admin, table, regionLocator); + } // Delete the files. util.getTestFileSystem().delete(p, true); http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java index 402381b..6d6feb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Table; @@ -87,7 +88,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), + table.getRegionLocator()); } /** @@ -150,20 +152,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, HFileOutputFormat2.configurePartitioner(job, splitPoints); } - /** - * Serialize column family to compression algorithm map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from - * @param conf to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings( - value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") - @VisibleForTesting static void configureCompression(Table table, Configuration conf) throws IOException { - HFileOutputFormat2.configureCompression(table, conf); + HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor()); } /** @@ -177,7 +167,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, */ @VisibleForTesting static void configureBlockSize(Table table, Configuration conf) throws IOException { - HFileOutputFormat2.configureBlockSize(table, conf); + HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf); } /** @@ -191,7 +181,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, */ @VisibleForTesting static void configureBloomType(Table table, Configuration conf) throws IOException { - HFileOutputFormat2.configureBloomType(table, conf); + HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf); } /** @@ -206,6 +196,7 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, @VisibleForTesting static void configureDataBlockEncoding(Table table, Configuration conf) throws IOException { - HFileOutputFormat2.configureDataBlockEncoding(table, conf); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 513e398..76266a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -364,7 +364,7 @@ public class HFileOutputFormat2 @Deprecated public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table, table); + configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator()); } /** @@ -383,13 +383,32 @@ public class HFileOutputFormat2 */ public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException { - configureIncrementalLoad(job, table, regionLocator, HFileOutputFormat2.class); + configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); } - static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator, - Class<? extends OutputFormat<?, ?>> cls) throws IOException { - Configuration conf = job.getConfiguration(); + /** + * Configure a MapReduce Job to perform an incremental load into the given + * table. This + * <ul> + * <li>Inspects the table to configure a total order partitioner</li> + * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> + * <li>Sets the number of reduce tasks to match the current number of regions</li> + * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> + * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or + * PutSortReducer)</li> + * </ul> + * The user should be sure to set the map output value class to either KeyValue or Put before + * running this function. + */ + public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, + RegionLocator regionLocator) throws IOException { + configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat2.class); + } + static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, + RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException, + UnsupportedEncodingException { + Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(cls); @@ -412,7 +431,7 @@ public class HFileOutputFormat2 KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + table.getName()); + LOG.info("Looking up current regions for table " + tableDescriptor.getTableName()); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); @@ -420,14 +439,14 @@ public class HFileOutputFormat2 configurePartitioner(job, startKeys); // Set compression algorithms based on column families - configureCompression(table, conf); - configureBloomType(table, conf); - configureBlockSize(table, conf); - configureDataBlockEncoding(table, conf); + configureCompression(conf, tableDescriptor); + configureBloomType(tableDescriptor, conf); + configureBlockSize(tableDescriptor, conf); + configureDataBlockEncoding(tableDescriptor, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + table.getName() + " output configured."); + LOG.info("Incremental table " + regionLocator.getName() + " output configured."); } public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { @@ -438,10 +457,11 @@ public class HFileOutputFormat2 job.setOutputFormatClass(HFileOutputFormat2.class); // Set compression algorithms based on column families - configureCompression(table, conf); - configureBloomType(table, conf); - configureBlockSize(table, conf); - configureDataBlockEncoding(table, conf); + configureCompression(conf, table.getTableDescriptor()); + configureBloomType(table.getTableDescriptor(), conf); + configureBlockSize(table.getTableDescriptor(), conf); + HTableDescriptor tableDescriptor = table.getTableDescriptor(); + configureDataBlockEncoding(tableDescriptor, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); @@ -590,10 +610,9 @@ public class HFileOutputFormat2 @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") @VisibleForTesting - static void configureCompression( - Table table, Configuration conf) throws IOException { + static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor) + throws UnsupportedEncodingException { StringBuilder compressionConfigValue = new StringBuilder(); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); if(tableDescriptor == null){ // could happen with mock table instance return; @@ -617,17 +636,16 @@ public class HFileOutputFormat2 /** * Serialize column family to block size map to configuration. * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from + * @param tableDescriptor to read the properties from * @param conf to persist serialized values into + * * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBlockSize( - Table table, Configuration conf) throws IOException { + static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf) + throws UnsupportedEncodingException { StringBuilder blockSizeConfigValue = new StringBuilder(); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); if (tableDescriptor == null) { // could happen with mock table instance return; @@ -651,16 +669,15 @@ public class HFileOutputFormat2 /** * Serialize column family to bloom type map to configuration. * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from + * @param tableDescriptor to read the properties from * @param conf to persist serialized values into + * * @throws IOException * on failure to read column family descriptors */ @VisibleForTesting - static void configureBloomType( - Table table, Configuration conf) throws IOException { - HTableDescriptor tableDescriptor = table.getTableDescriptor(); + static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf) + throws UnsupportedEncodingException { if (tableDescriptor == null) { // could happen with mock table instance return; @@ -694,9 +711,8 @@ public class HFileOutputFormat2 * on failure to read column family descriptors */ @VisibleForTesting - static void configureDataBlockEncoding(Table table, - Configuration conf) throws IOException { - HTableDescriptor tableDescriptor = table.getTableDescriptor(); + static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, + Configuration conf) throws UnsupportedEncodingException { if (tableDescriptor == null) { // could happen with mock table instance return; http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java index 399d607..d1e886d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java @@ -41,13 +41,18 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -441,15 +446,18 @@ public class Import { if (hfileOutPath != null) { job.setMapperClass(KeyValueImporter.class); - HTable table = new HTable(conf, tableName); - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat2.configureIncrementalLoad(job, table, table); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - com.google.common.base.Preconditions.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)){ + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } } else { // No reducers. Just write straight to table. Call initTableReducerJob // because it sets up the TableOutputFormat. http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 54e0034..90f2f0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -20,17 +20,13 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Set; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -40,11 +36,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; @@ -59,9 +58,11 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; /** * Tool to import data from a TSV file. @@ -496,7 +497,8 @@ public class ImportTsv extends Configured implements Tool { throw new TableNotFoundException(errorMsg); } } - try (HTable table = (HTable)connection.getTable(tableName)) { + try (Table table = connection.getTable(tableName); + RegionLocator regionLocator = connection.getRegionLocator(tableName)) { boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false); // if no.strict is false then check column family if(!noStrict) { @@ -534,7 +536,8 @@ public class ImportTsv extends Configured implements Tool { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), + regionLocator); } } else { if (!admin.tableExists(tableName)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b4b6adc..c5af937 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -20,42 +20,21 @@ package org.apache.hadoop.hbase.mapreduce; import static java.lang.String.format; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -64,10 +43,14 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Table; @@ -95,12 +78,30 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Tool to load the output of HFileOutputFormat into an existing table. @@ -235,12 +236,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void doBulkLoad(Path hfofDir, final HTable table) throws TableNotFoundException, IOException { - final HConnection conn = table.getConnection(); + doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator()); + } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * + * @param hfofDir the directory that was provided as the output path + * of a job using HFileOutputFormat + * @param table the table to load into + * @throws TableNotFoundException if table does not yet exist + */ + @SuppressWarnings("deprecation") + public void doBulkLoad(Path hfofDir, final Admin admin, Table table, + RegionLocator regionLocator) throws TableNotFoundException, IOException { - if (!conn.isTableAvailable(table.getName())) { - throw new TableNotFoundException("Table " + - Bytes.toStringBinary(table.getTableName()) + - "is not currently available."); + if (!admin.isTableAvailable(regionLocator.getName())) { + throw new TableNotFoundException("Table " + table.getName() + "is not currently available."); } // initialize thread pools @@ -276,7 +289,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String msg = "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + unmatchedFamilies + "; valid family names of table " - + Bytes.toString(table.getTableName()) + " are: " + familyNames; + + table.getName() + " are: " + familyNames; LOG.error(msg); throw new IOException(msg); } @@ -300,7 +313,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { // need to reload split keys each iteration. - final Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys(); + final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); if (count != 0) { LOG.info("Split occured while grouping HFiles, retry attempt " + + count + " with " + queue.size() + " files remaining to group or split"); @@ -323,7 +336,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + " hfiles to one family of one region"); } - bulkLoadPhase(table, conn, pool, queue, regionGroups); + bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); // NOTE: The next iteration's split / group could happen in parallel to // atomic bulkloads assuming that there are splits and no merges, and @@ -359,7 +372,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * them. Any failures are re-queued for another pass with the * groupOrSplitPhase. */ - protected void bulkLoadPhase(final Table table, final HConnection conn, + protected void bulkLoadPhase(final Table table, final Connection conn, ExecutorService pool, Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException { // atomically bulk load the groups. @@ -431,7 +444,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely * bulk load region targets. */ - private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final HTable table, + private Multimap<ByteBuffer, LoadQueueItem> groupOrSplitPhase(final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { // <region start key, LQI> need synchronized only within this scope of this @@ -524,7 +537,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @throws IOException */ protected List<LoadQueueItem> groupOrSplit(Multimap<ByteBuffer, LoadQueueItem> regionGroups, - final LoadQueueItem item, final HTable table, + final LoadQueueItem item, final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; @@ -569,18 +582,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ if (indexForCallable < 0) { throw new IOException("The first region info for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { throw new IOException("The last region info for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " cann't be found in hbase:meta.Please use hbck tool to fix it first."); } else if (indexForCallable + 1 < startEndKeys.getFirst().length && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { throw new IOException("The endkey of one region for table " - + Bytes.toString(table.getTableName()) + + table.getName() + " is not equal to the startkey of the next region in hbase:meta." + "Please use hbck tool to fix it first."); } @@ -623,7 +636,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @return empty list if success, list of items to retry on recoverable * failure */ - protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn, + protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis) throws IOException { final List<Pair<byte[], String>> famPaths = http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java index b9a2db7..890cfdd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java @@ -155,14 +155,10 @@ public abstract class MultiTableInputFormatBase extends Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next(); TableName tableName = entry.getKey(); List<Scan> scanList = entry.getValue(); - Table table = null; - RegionLocator regionLocator = null; - Connection conn = null; - try{ - conn = ConnectionFactory.createConnection(context.getConfiguration()); - table = conn.getTable(tableName); - regionLocator = (RegionLocator) table; + try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration()); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { RegionSizeCalculator sizeCalculator = new RegionSizeCalculator( regionLocator, conn.getAdmin()); Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys(); @@ -210,10 +206,6 @@ public abstract class MultiTableInputFormatBase extends } } } - } finally { - if (null != table) table.close(); - if (null != regionLocator) regionLocator.close(); - if (null != conn) conn.close(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java index 4123467..3bf001b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java @@ -355,7 +355,7 @@ extends InputFormat<ImmutableBytesWritable, Result> { @Deprecated protected void setHTable(HTable table) throws IOException { this.table = table; - this.regionLocator = table; + this.regionLocator = table.getRegionLocator(); this.admin = table.getConnection().getAdmin(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index a487878..fb888aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -17,16 +17,8 @@ */ package org.apache.hadoop.hbase.mapreduce; -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Map; -import java.util.TreeMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; @@ -36,14 +28,19 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -52,6 +49,12 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.TreeMap; + /** * A tool to replay WAL files as a M/R job. * The WAL can be replayed for a set of tables or all tables, @@ -257,13 +260,17 @@ public class WALPlayer extends Configured implements Tool { if (tables.length != 1) { throw new IOException("Exactly one table must be specified for the bulk export option"); } - HTable table = new HTable(conf, TableName.valueOf(tables[0])); + TableName tableName = TableName.valueOf(tables[0]); job.setMapperClass(WALKeyValueMapper.class); job.setReducerClass(KeyValueSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputValueClass(KeyValue.class); - HFileOutputFormat2.configureIncrementalLoad(job, table, table); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + } TableMapReduceUtil.addDependencyJars(job.getConfiguration(), com.google.common.base.Preconditions.class); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java index 92c4410..4f7c0a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSizeCalculator.java @@ -68,7 +68,7 @@ public class RegionSizeCalculator { public RegionSizeCalculator(HTable table) throws IOException { HBaseAdmin admin = new HBaseAdmin(table.getConfiguration()); try { - init(table, admin); + init(table.getRegionLocator(), admin); } finally { admin.close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index ee67629..74bcfa2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2128,56 +2128,55 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { final byte[] columnFamily, byte [][] startKeys) throws IOException { Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); - Table meta = new HTable(c, TableName.META_TABLE_NAME); - HTableDescriptor htd = table.getTableDescriptor(); - if(!htd.hasFamily(columnFamily)) { - HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); - htd.addFamily(hcd); - } - // remove empty region - this is tricky as the mini cluster during the test - // setup already has the "<tablename>,,123456789" row with an empty start - // and end key. Adding the custom regions below adds those blindly, - // including the new start region from empty to "bbb". lg - List<byte[]> rows = getMetaTableRows(htd.getTableName()); - String regionToDeleteInFS = table - .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0) - .getRegionInfo().getEncodedName(); - List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length); - // add custom ones - int count = 0; - for (int i = 0; i < startKeys.length; i++) { - int j = (i + 1) % startKeys.length; - HRegionInfo hri = new HRegionInfo(table.getName(), - startKeys[i], startKeys[j]); - MetaTableAccessor.addRegionToMeta(meta, hri); - newRegions.add(hri); - count++; - } - // see comment above, remove "old" (or previous) single region - for (byte[] row : rows) { - LOG.info("createMultiRegions: deleting meta row -> " + - Bytes.toStringBinary(row)); - meta.delete(new Delete(row)); - } - // remove the "old" region from FS - Path tableDir = new Path(getDefaultRootDirPath().toString() - + System.getProperty("file.separator") + htd.getTableName() - + System.getProperty("file.separator") + regionToDeleteInFS); - FileSystem.get(c).delete(tableDir, true); - // flush cache of regions - HConnection conn = table.getConnection(); - conn.clearRegionCache(); - // assign all the new regions IF table is enabled. - Admin admin = getHBaseAdmin(); - if (admin.isTableEnabled(table.getName())) { - for(HRegionInfo hri : newRegions) { - admin.assign(hri.getRegionName()); + try (Table meta = new HTable(c, TableName.META_TABLE_NAME)) { + HTableDescriptor htd = table.getTableDescriptor(); + if(!htd.hasFamily(columnFamily)) { + HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); + htd.addFamily(hcd); + } + // remove empty region - this is tricky as the mini cluster during the test + // setup already has the "<tablename>,,123456789" row with an empty start + // and end key. Adding the custom regions below adds those blindly, + // including the new start region from empty to "bbb". lg + List<byte[]> rows = getMetaTableRows(htd.getTableName()); + String regionToDeleteInFS = table + .getRegionsInRange(Bytes.toBytes(""), Bytes.toBytes("")).get(0) + .getRegionInfo().getEncodedName(); + List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length); + // add custom ones + int count = 0; + for (int i = 0; i < startKeys.length; i++) { + int j = (i + 1) % startKeys.length; + HRegionInfo hri = new HRegionInfo(table.getName(), + startKeys[i], startKeys[j]); + MetaTableAccessor.addRegionToMeta(meta, hri); + newRegions.add(hri); + count++; + } + // see comment above, remove "old" (or previous) single region + for (byte[] row : rows) { + LOG.info("createMultiRegions: deleting meta row -> " + + Bytes.toStringBinary(row)); + meta.delete(new Delete(row)); + } + // remove the "old" region from FS + Path tableDir = new Path(getDefaultRootDirPath().toString() + + System.getProperty("file.separator") + htd.getTableName() + + System.getProperty("file.separator") + regionToDeleteInFS); + FileSystem.get(c).delete(tableDir, true); + // flush cache of regions + HConnection conn = table.getConnection(); + conn.clearRegionCache(); + // assign all the new regions IF table is enabled. + Admin admin = conn.getAdmin(); + if (admin.isTableEnabled(table.getName())) { + for(HRegionInfo hri : newRegions) { + admin.assign(hri.getRegionName()); + } } - } - - meta.close(); - return count; + return count; + } } /** @@ -3453,10 +3452,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } public static int getMetaRSPort(Configuration conf) throws IOException { - RegionLocator table = new HTable(conf, TableName.META_TABLE_NAME); - HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes("")); - table.close(); - return hloc.getPort(); + try (Connection c = ConnectionFactory.createConnection(); + RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) { + return locator.getRegionLocation(Bytes.toBytes("")).getPort(); + } } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java index 1f83201..5a5005a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java @@ -26,8 +26,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -68,7 +66,7 @@ public class TestRegionRebalancing { private static final byte[] FAMILY_NAME = Bytes.toBytes("col"); public static final Log LOG = LogFactory.getLog(TestRegionRebalancing.class); private final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private RegionLocator table; + private RegionLocator regionLocator; private HTableDescriptor desc; private String balancerName; @@ -100,59 +98,59 @@ public class TestRegionRebalancing { @SuppressWarnings("deprecation") public void testRebalanceOnRegionServerNumberChange() throws IOException, InterruptedException { - Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); - Admin admin = connection.getAdmin(); - admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS, - 1, HBaseTestingUtility.KEYS.length)); - this.table = new HTable(UTIL.getConfiguration(), this.desc.getTableName()); - - MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection()); - - assertEquals("Test table should have right number of regions", - HBaseTestingUtility.KEYS.length, - this.table.getStartKeys().length); - - // verify that the region assignments are balanced to start out - assertRegionsAreBalanced(); - - // add a region server - total of 2 - LOG.info("Started second server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - UTIL.getHBaseCluster().getMaster().balance(); - assertRegionsAreBalanced(); - - // On a balanced cluster, calling balance() should return true - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - - // if we add a server, then the balance() call should return true - // add a region server - total of 3 - LOG.info("Started third server=" + + try(Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration()); + Admin admin = connection.getAdmin()) { + admin.createTable(this.desc, Arrays.copyOfRange(HBaseTestingUtility.KEYS, + 1, HBaseTestingUtility.KEYS.length)); + this.regionLocator = connection.getRegionLocator(this.desc.getTableName()); + + MetaTableAccessor.fullScanMetaAndPrint(admin.getConnection()); + + assertEquals("Test table should have right number of regions", + HBaseTestingUtility.KEYS.length, + this.regionLocator.getStartKeys().length); + + // verify that the region assignments are balanced to start out + assertRegionsAreBalanced(); + + // add a region server - total of 2 + LOG.info("Started second server=" + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - - // kill a region server - total of 2 - LOG.info("Stopped third server=" + UTIL.getHBaseCluster().stopRegionServer(2, false)); - UTIL.getHBaseCluster().waitOnRegionServer(2); - UTIL.getHBaseCluster().getMaster().balance(); - assertRegionsAreBalanced(); - - // start two more region servers - total of 4 - LOG.info("Readding third server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - LOG.info("Added fourth server=" + - UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - - for (int i = 0; i < 6; i++){ - LOG.info("Adding " + (i + 5) + "th region server"); - UTIL.getHBaseCluster().startRegionServer(); + UTIL.getHBaseCluster().getMaster().balance(); + assertRegionsAreBalanced(); + + // On a balanced cluster, calling balance() should return true + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + + // if we add a server, then the balance() call should return true + // add a region server - total of 3 + LOG.info("Started third server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + + // kill a region server - total of 2 + LOG.info("Stopped third server=" + UTIL.getHBaseCluster().stopRegionServer(2, false)); + UTIL.getHBaseCluster().waitOnRegionServer(2); + UTIL.getHBaseCluster().getMaster().balance(); + assertRegionsAreBalanced(); + + // start two more region servers - total of 4 + LOG.info("Readding third server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + LOG.info("Added fourth server=" + + UTIL.getHBaseCluster().startRegionServer().getRegionServer().getServerName()); + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + + for (int i = 0; i < 6; i++){ + LOG.info("Adding " + (i + 5) + "th region server"); + UTIL.getHBaseCluster().startRegionServer(); + } + assert(UTIL.getHBaseCluster().getMaster().balance() == true); + assertRegionsAreBalanced(); + regionLocator.close(); } - assert(UTIL.getHBaseCluster().getMaster().balance() == true); - assertRegionsAreBalanced(); - table.close(); - admin.close(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index a99b047..e0bf2e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * {@link ClusterConnection} testing utility. @@ -68,6 +70,23 @@ public class HConnectionTestingUtility { } /** + * @param connection + */ + private static void mockRegionLocator(final HConnectionImplementation connection) { + try { + Mockito.when(connection.getRegionLocator(Mockito.any(TableName.class))).thenAnswer( + new Answer<RegionLocator>() { + @Override + public RegionLocator answer(InvocationOnMock invocation) throws Throwable { + TableName tableName = (TableName) invocation.getArguments()[0]; + return new HRegionLocator(tableName, connection); + } + }); + } catch (IOException e) { + } + } + + /** * Calls {@link #getMockedConnection(Configuration)} and then mocks a few * more of the popular {@link ClusterConnection} methods so they do 'normal' * operation (see return doc below for list). Be sure to shutdown the @@ -107,6 +126,7 @@ public class HConnectionTestingUtility { Mockito.doNothing().when(c).close(); // Make it so we return a particular location when asked. final HRegionLocation loc = new HRegionLocation(hri, sn); + mockRegionLocator(c); Mockito.when(c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())). thenReturn(loc); http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index bc805fe..7c2a1ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -26,24 +26,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -107,6 +89,22 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + /** * Run tests that use the HBase clients; {@link HTable}. * Sets up the HBase mini cluster once at start and runs through all client tests. @@ -5203,40 +5201,41 @@ public class TestFromClientSide { TableName TABLE = TableName.valueOf("testNonCachedGetRegionLocation"); byte [] family1 = Bytes.toBytes("f1"); byte [] family2 = Bytes.toBytes("f2"); - HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10); - Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); - Map <HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); - assertEquals(1, regionsMap.size()); - HRegionInfo regionInfo = regionsMap.keySet().iterator().next(); - ServerName addrBefore = regionsMap.get(regionInfo); - // Verify region location before move. - HRegionLocation addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); - HRegionLocation addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); - - assertEquals(addrBefore.getPort(), addrCache.getPort()); - assertEquals(addrBefore.getPort(), addrNoCache.getPort()); - - ServerName addrAfter = null; - // Now move the region to a different server. - for (int i = 0; i < SLAVES; i++) { - HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(i); - ServerName addr = regionServer.getServerName(); - if (addr.getPort() != addrBefore.getPort()) { - admin.move(regionInfo.getEncodedNameAsBytes(), - Bytes.toBytes(addr.toString())); - // Wait for the region to move. - Thread.sleep(5000); - addrAfter = addr; - break; + try (HTable table = TEST_UTIL.createTable(TABLE, new byte[][] {family1, family2}, 10); + Admin admin = new HBaseAdmin(TEST_UTIL.getConfiguration())) { + Map <HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); + assertEquals(1, regionsMap.size()); + HRegionInfo regionInfo = regionsMap.keySet().iterator().next(); + ServerName addrBefore = regionsMap.get(regionInfo); + // Verify region location before move. + HRegionLocation addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); + HRegionLocation addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); + + assertEquals(addrBefore.getPort(), addrCache.getPort()); + assertEquals(addrBefore.getPort(), addrNoCache.getPort()); + + ServerName addrAfter = null; + // Now move the region to a different server. + for (int i = 0; i < SLAVES; i++) { + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(i); + ServerName addr = regionServer.getServerName(); + if (addr.getPort() != addrBefore.getPort()) { + admin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(addr.toString())); + // Wait for the region to move. + Thread.sleep(5000); + addrAfter = addr; + break; + } } + + // Verify the region was moved. + addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); + addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); + assertNotNull(addrAfter); + assertTrue(addrAfter.getPort() != addrCache.getPort()); + assertEquals(addrAfter.getPort(), addrNoCache.getPort()); } - - // Verify the region was moved. - addrCache = table.getRegionLocation(regionInfo.getStartKey(), false); - addrNoCache = table.getRegionLocation(regionInfo.getStartKey(), true); - assertNotNull(addrAfter); - assertTrue(addrAfter.getPort() != addrCache.getPort()); - assertEquals(addrAfter.getPort(), addrNoCache.getPort()); } @Test @@ -6241,10 +6240,13 @@ public class TestFromClientSide { HColumnDescriptor fam = new HColumnDescriptor(FAMILY); htd.addFamily(fam); byte[][] KEYS = HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE; - TEST_UTIL.getHBaseAdmin().createTable(htd, KEYS); - List<HRegionInfo> regions = TEST_UTIL.getHBaseAdmin().getTableRegions(htd.getTableName()); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + admin.createTable(htd, KEYS); + List<HRegionInfo> regions = admin.getTableRegions(htd.getTableName()); - for (int regionReplication = 1; regionReplication < 4 ; regionReplication++) { + HRegionLocator locator = + (HRegionLocator) admin.getConnection().getRegionLocator(htd.getTableName()); + for (int regionReplication = 1; regionReplication < 4; regionReplication++) { List<RegionLocations> regionLocations = new ArrayList<RegionLocations>(); // mock region locations coming from meta with multiple replicas @@ -6256,10 +6258,7 @@ public class TestFromClientSide { regionLocations.add(new RegionLocations(arr)); } - HTable table = spy(new HTable(TEST_UTIL.getConfiguration(), htd.getTableName())); - when(table.listRegionLocations()).thenReturn(regionLocations); - - Pair<byte[][], byte[][]> startEndKeys = table.getStartEndKeys(); + Pair<byte[][], byte[][]> startEndKeys = locator.getStartEndKeys(regionLocations); assertEquals(KEYS.length + 1, startEndKeys.getFirst().length); @@ -6269,9 +6268,6 @@ public class TestFromClientSide { assertArrayEquals(startKey, startEndKeys.getFirst()[i]); assertArrayEquals(endKey, startEndKeys.getSecond()[i]); } - - table.close(); } } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/2c4fe03e/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java index 8ed8464..afe7e40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java @@ -335,9 +335,10 @@ public class TestHFileOutputFormat { public void testJobConfiguration() throws Exception { Job job = new Job(util.getConfiguration()); job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration")); - HTable table = Mockito.mock(HTable.class); - setupMockStartKeys(table); - HFileOutputFormat.configureIncrementalLoad(job, table); + HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class); + RegionLocator regionLocator = Mockito.mock(RegionLocator.class); + setupMockStartKeys(regionLocator); + HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator); assertEquals(job.getNumReduceTasks(), 4); } @@ -467,12 +468,13 @@ public class TestHFileOutputFormat { MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), + table.getRegionLocator()); FileOutputFormat.setOutputPath(job, outDir); Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ; - assertEquals(table.getRegionLocations().size(), job.getNumReduceTasks()); + assertEquals(table.getRegionLocator().getAllRegionLocations().size(), job.getNumReduceTasks()); assertTrue(job.waitForCompletion(true)); } @@ -769,14 +771,14 @@ public class TestHFileOutputFormat { return familyToDataBlockEncoding; } - private void setupMockStartKeys(RegionLocator table) throws IOException { + private void setupMockStartKeys(RegionLocator regionLocator) throws IOException { byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"), Bytes.toBytes("ggg"), Bytes.toBytes("zzz") }; - Mockito.doReturn(mockKeys).when(table).getStartKeys(); + Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys(); } /** @@ -792,6 +794,7 @@ public class TestHFileOutputFormat { // Setup table descriptor HTable table = Mockito.mock(HTable.class); + RegionLocator regionLocator = Mockito.mock(RegionLocator.class); HTableDescriptor htd = new HTableDescriptor(TABLE_NAME); Mockito.doReturn(htd).when(table).getTableDescriptor(); for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) { @@ -799,7 +802,7 @@ public class TestHFileOutputFormat { } // set up the table to return some mock keys - setupMockStartKeys(table); + setupMockStartKeys(regionLocator); try { // partial map red setup to get an operational writer for testing @@ -809,7 +812,7 @@ public class TestHFileOutputFormat { Job job = new Job(conf, "testLocalMRIncrementalLoad"); job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings")); setupRandomGeneratorMapper(job); - HFileOutputFormat.configureIncrementalLoad(job, table); + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); FileOutputFormat.setOutputPath(job, dir); context = createTestTaskAttemptContext(job); HFileOutputFormat hof = new HFileOutputFormat();
