Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Wed Apr 27 23:12:42 2011 @@ -22,8 +22,16 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.lang.reflect.Proxy; import java.lang.reflect.UndeclaredThrowableException; -import java.util.*; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -46,11 +54,17 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MasterAddressTracker; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.*; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; +import org.apache.hadoop.hbase.ipc.HBaseRPC; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; @@ -243,7 +257,8 @@ public class HConnectionManager { private final Map<String, HRegionInterface> servers = new ConcurrentHashMap<String, HRegionInterface>(); - private final ConcurrentHashMap<String, String> connectionLock = new ConcurrentHashMap<String, String>(); + private final ConcurrentHashMap<String, String> connectionLock = + new ConcurrentHashMap<String, String>(); /** * Map of table to table {@link HRegionLocation}s. The table key is made @@ -340,7 +355,7 @@ public class HConnectionManager { } } - HServerAddress masterLocation = null; + ServerName sn = null; synchronized (this.masterLock) { for (int tries = 0; !this.closed && @@ -349,8 +364,8 @@ public class HConnectionManager { tries++) { try { - masterLocation = masterAddressTracker.getMasterAddress(); - if(masterLocation == null) { + sn = masterAddressTracker.getMasterAddress(); + if (sn == null) { LOG.info("ZooKeeper available but no active master location found"); throw new MasterNotRunningException(); } @@ -358,9 +373,11 @@ public class HConnectionManager { if (clusterId.hasId()) { conf.set(HConstants.CLUSTER_ID, clusterId.getId()); } + InetSocketAddress isa = + new InetSocketAddress(sn.getHostname(), sn.getPort()); HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy( - HMasterInterface.class, HMasterInterface.VERSION, - masterLocation.getInetSocketAddress(), this.conf, this.rpcTimeout); + HMasterInterface.class, HMasterInterface.VERSION, isa, this.conf, + this.rpcTimeout); if (tryMaster.isMasterRunning()) { this.master = tryMaster; @@ -391,10 +408,10 @@ public class HConnectionManager { this.masterChecked = true; } if (this.master == null) { - if (masterLocation == null) { + if (sn == null) { throw new MasterNotRunningException(); } - throw new MasterNotRunningException(masterLocation.toString()); + throw new MasterNotRunningException(sn.toString()); } return this.master; } @@ -577,12 +594,13 @@ public class HConnectionManager { if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { try { - HServerAddress hsa = + ServerName servername = this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout); LOG.debug("Lookedup root region location, connection=" + this + - "; hsa=" + hsa); - if (hsa == null) return null; - return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa); + "; serverName=" + ((servername == null)? "": servername.toString())); + if (servername == null) return null; + return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, + servername.getHostname(), servername.getPort()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; @@ -631,11 +649,14 @@ public class HConnectionManager { if (value == null) { return true; // don't cache it } - final String serverAddress = Bytes.toString(value); - + final String hostAndPort = Bytes.toString(value); + String hostname = Addressing.parseHostname(hostAndPort); + int port = Addressing.parsePort(hostAndPort); + value = result.getValue(HConstants.CATALOG_FAMILY, + HConstants.STARTCODE_QUALIFIER); // instantiate the location - HRegionLocation loc = new HRegionLocation(regionInfo, - new HServerAddress(serverAddress)); + HRegionLocation loc = + new HRegionLocation(regionInfo, hostname, port); // cache this meta entry cacheLocation(tableName, loc); } @@ -690,7 +711,7 @@ public class HConnectionManager { // If null still, go around again. if (metaLocation == null) continue; HRegionInterface server = - getHRegionConnection(metaLocation.getServerAddress()); + getHRegionConnection(metaLocation.getHostname(), metaLocation.getPort()); Result regionInfoRow = null; // This block guards against two threads trying to load the meta @@ -725,7 +746,7 @@ public class HConnectionManager { if (regionInfoRow == null) { throw new TableNotFoundException(Bytes.toString(tableName)); } - byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, + byte [] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); if (value == null || value.length == 0) { throw new IOException("HRegionInfo was null or empty in " + @@ -746,19 +767,22 @@ public class HConnectionManager { value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); - String serverAddress = ""; - if(value != null) { - serverAddress = Bytes.toString(value); + String hostAndPort = ""; + if (value != null) { + hostAndPort = Bytes.toString(value); } - if (serverAddress.equals("")) { + if (hostAndPort.equals("")) { throw new NoServerForRegionException("No server address listed " + "in " + Bytes.toString(parentTable) + " for region " + regionInfo.getRegionNameAsString()); } - // instantiate the location - location = new HRegionLocation(regionInfo, - new HServerAddress(serverAddress)); + // Instantiate the location + String hostname = Addressing.parseHostname(hostAndPort); + int port = Addressing.parsePort(hostAndPort); + value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + location = new HRegionLocation(regionInfo, hostname, port); cacheLocation(tableName, location); return location; } catch (TableNotFoundException e) { @@ -936,14 +960,48 @@ public class HConnectionManager { } } - public HRegionInterface getHRegionConnection( - HServerAddress regionServer, boolean getMaster) + public HRegionInterface getHRegionConnection(HServerAddress hsa) throws IOException { - if (getMaster) { - getMaster(); - } + return getHRegionConnection(hsa, false); + } + + @Override + public HRegionInterface getHRegionConnection(final String hostname, + final int port) + throws IOException { + return getHRegionConnection(hostname, port, false); + } + + public HRegionInterface getHRegionConnection(HServerAddress hsa, + boolean master) + throws IOException { + return getHRegionConnection(null, -1, hsa.getInetSocketAddress(), master); + } + + @Override + public HRegionInterface getHRegionConnection(final String hostname, + final int port, final boolean master) + throws IOException { + return getHRegionConnection(hostname, port, null, master); + } + + /** + * Either the passed <code>isa</code> is null or <code>hostname</code> + * can be but not both. + * @param hostname + * @param port + * @param isa + * @param master + * @return Proxy. + * @throws IOException + */ + HRegionInterface getHRegionConnection(final String hostname, final int port, + final InetSocketAddress isa, final boolean master) + throws IOException { + if (master) getMaster(); HRegionInterface server; - String rsName = regionServer.toString(); + String rsName = isa != null? + isa.toString(): Addressing.createHostAndPortStr(hostname, port); // See if we already have a connection (common case) server = this.servers.get(rsName); if (server == null) { @@ -958,12 +1016,15 @@ public class HConnectionManager { if (clusterId.hasId()) { conf.set(HConstants.CLUSTER_ID, clusterId.getId()); } + // Only create isa when we need to. + InetSocketAddress address = isa != null? isa: + new InetSocketAddress(hostname, port); // definitely a cache miss. establish an RPC for this RS server = (HRegionInterface) HBaseRPC.waitForProxy( serverInterfaceClass, HRegionInterface.VERSION, - regionServer.getInetSocketAddress(), this.conf, + address, this.conf, this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); - this.servers.put(rsName, server); + this.servers.put(address.toString(), server); } catch (RemoteException e) { LOG.warn("RemoteException connecting to RS", e); // Throw what the RemoteException was carrying. @@ -975,12 +1036,6 @@ public class HConnectionManager { return server; } - public HRegionInterface getHRegionConnection( - HServerAddress regionServer) - throws IOException { - return getHRegionConnection(regionServer, false); - } - /** * Get the ZooKeeper instance for this TableServers instance. * @@ -1065,10 +1120,8 @@ public class HConnectionManager { this.closed = true; } - private <R> Callable<MultiResponse> createCallable( - final HServerAddress address, - final MultiAction<R> multi, - final byte [] tableName) { + private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc, + final MultiAction<R> multi, final byte [] tableName) { final HConnection connection = this; return new Callable<MultiResponse>() { public MultiResponse call() throws IOException { @@ -1079,7 +1132,8 @@ public class HConnectionManager { } @Override public void instantiateServer(boolean reload) throws IOException { - server = connection.getHRegionConnection(address); + server = + connection.getHRegionConnection(loc.getHostname(), loc.getPort()); } } ); @@ -1191,8 +1245,10 @@ public class HConnectionManager { } // Keep track of the most recent servers for any given item for better - // exceptional reporting. - HServerAddress [] lastServers = new HServerAddress[results.length]; + // exceptional reporting. We keep HRegionLocation to save on parsing. + // Later below when we use lastServers, we'll pull what we need from + // lastServers. + HRegionLocation [] lastServers = new HRegionLocation[results.length]; List<Row> workingList = new ArrayList<Row>(list); boolean retry = true; // count that helps presize actions array @@ -1208,43 +1264,41 @@ public class HConnectionManager { Thread.sleep(sleepTime); } // step 1: break up into regionserver-sized chunks and build the data structs - Map<HServerAddress, MultiAction<R>> actionsByServer = - new HashMap<HServerAddress, MultiAction<R>>(); + Map<HRegionLocation, MultiAction<R>> actionsByServer = + new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { HRegionLocation loc = locateRegion(tableName, row.getRow(), true); - HServerAddress address = loc.getServerAddress(); byte[] regionName = loc.getRegionInfo().getRegionName(); - MultiAction<R> actions = actionsByServer.get(address); + MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); - actionsByServer.put(address, actions); + actionsByServer.put(loc, actions); } Action<R> action = new Action<R>(regionName, row, i); - lastServers[i] = address; + lastServers[i] = loc; actions.add(regionName, action); } } // step 2: make the requests - Map<HServerAddress,Future<MultiResponse>> futures = - new HashMap<HServerAddress, Future<MultiResponse>>( + Map<HRegionLocation, Future<MultiResponse>> futures = + new HashMap<HRegionLocation, Future<MultiResponse>>( actionsByServer.size()); - for (Entry<HServerAddress, MultiAction<R>> e - : actionsByServer.entrySet()) { + for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3: collect the failures and successes and prepare for retry - for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer + for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer : futures.entrySet()) { - HServerAddress address = responsePerServer.getKey(); + HRegionLocation loc = responsePerServer.getKey(); try { Future<MultiResponse> future = responsePerServer.getValue(); @@ -1252,7 +1306,8 @@ public class HConnectionManager { if (resp == null) { // Entire server failed - LOG.debug("Failed all for server: " + address + ", removing from cache"); + LOG.debug("Failed all for server: " + loc.getHostnamePort() + + ", removing from cache"); continue; } @@ -1277,7 +1332,7 @@ public class HConnectionManager { } } } catch (ExecutionException e) { - LOG.debug("Failed all from " + address, e); + LOG.debug("Failed all from " + loc, e); } } @@ -1320,13 +1375,13 @@ public class HConnectionManager { List<Throwable> exceptions = new ArrayList<Throwable>(actionCount); List<Row> actions = new ArrayList<Row>(actionCount); - List<HServerAddress> addresses = new ArrayList<HServerAddress>(actionCount); + List<String> addresses = new ArrayList<String>(actionCount); for (int i = 0 ; i < results.length; i++) { if (results[i] == null || results[i] instanceof Throwable) { exceptions.add((Throwable)results[i]); actions.add(list.get(i)); - addresses.add(lastServers[i]); + addresses.add(lastServers[i].getHostnamePort()); } } @@ -1418,11 +1473,14 @@ public class HConnectionManager { return !regionCachePrefetchDisabledTables.contains(Bytes.mapKey(tableName)); } - public void prewarmRegionCache(final byte[] tableName, - final Map<HRegionInfo, HServerAddress> regions) { + @Override + public void prewarmRegionCache(byte[] tableName, + Map<HRegionInfo, HServerAddress> regions) { for (Map.Entry<HRegionInfo, HServerAddress> e : regions.entrySet()) { + HServerAddress hsa = e.getValue(); + if (hsa == null || hsa.getInetSocketAddress() == null) continue; cacheLocation(tableName, - new HRegionLocation(e.getKey(), e.getValue())); + new HRegionLocation(e.getKey(), hsa.getHostname(), hsa.getPort())); } }
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Apr 27 23:12:42 2011 @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -49,12 +50,14 @@ import org.apache.hadoop.hbase.HServerAd import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.ExecRPCInvoker; +import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; @@ -283,6 +286,7 @@ public class HTable implements HTableInt * <em>INTERNAL</em> Used by unit tests and tools to do low-level * manipulations. * @return An HConnection instance. + * @deprecated This method will be changed from public to package protected. */ // TODO(tsuna): Remove this. Unit tests shouldn't require public helpers. public HConnection getConnection() { @@ -378,10 +382,9 @@ public class HTable implements HTableInt /** * Gets all the regions and their address for this table. - * <p> - * This is mainly useful for the MapReduce integration. * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs + * @deprecated Use {@link #getRegionLocations()} or {@link #getStartEndKeys()} */ public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException { final Map<HRegionInfo, HServerAddress> regionMap = @@ -401,8 +404,8 @@ public class HTable implements HTableInt byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); if (value != null && value.length > 0) { - String address = Bytes.toString(value); - server = new HServerAddress(address); + String hostAndPort = Bytes.toString(value); + server = new HServerAddress(Addressing.createInetSocketAddressFromHostAndPortStr(hostAndPort)); } if (!(info.isOffline() || info.isSplit())) { @@ -417,6 +420,17 @@ public class HTable implements HTableInt } /** + * Gets all the regions and their address for this table. + * <p> + * This is mainly useful for the MapReduce integration. + * @return A map of HRegionInfo with it's server address + * @throws IOException if a remote or network exception occurs + */ + public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException { + return MetaScanner.allTableRegions(getConfiguration(), getTableName(), false); + } + + /** * Save the passed region information and the table's regions * cache. * <p> Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Wed Apr 27 23:12:42 2011 @@ -23,13 +23,20 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; @@ -225,8 +232,7 @@ public class MetaScanner { public static List<HRegionInfo> listAllRegions(Configuration conf, final boolean offlined) throws IOException { final List<HRegionInfo> regions = new ArrayList<HRegionInfo>(); - MetaScannerVisitor visitor = - new MetaScannerVisitor() { + MetaScannerVisitor visitor = new MetaScannerVisitor() { @Override public boolean processRow(Result result) throws IOException { if (result == null || result.isEmpty()) { @@ -250,6 +256,51 @@ public class MetaScanner { } /** + * Lists all of the table regions currently in META. + * @param conf + * @param offlined True if we are to include offlined regions, false and we'll + * leave out offlined regions from returned list. + * @return Map of all user-space regions to servers + * @throws IOException + */ + public static NavigableMap<HRegionInfo, ServerName> allTableRegions(Configuration conf, final byte [] tablename, final boolean offlined) + throws IOException { + final NavigableMap<HRegionInfo, ServerName> regions = + new TreeMap<HRegionInfo, ServerName>(); + MetaScannerVisitor visitor = new MetaScannerVisitor() { + @Override + public boolean processRow(Result rowResult) throws IOException { + HRegionInfo info = Writables.getHRegionInfo( + rowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER)); + if (!(Bytes.equals(info.getTableDesc().getName(), tablename))) { + return false; + } + byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + String hostAndPort = null; + if (value != null && value.length > 0) { + hostAndPort = Bytes.toString(value); + } + value = rowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.STARTCODE_QUALIFIER); + long startcode = -1L; + if (value != null && value.length > 0) startcode = Bytes.toLong(value); + if (!(info.isOffline() || info.isSplit())) { + ServerName sn = null; + if (hostAndPort != null && hostAndPort.length() > 0) { + sn = new ServerName(hostAndPort, startcode); + } + regions.put(new UnmodifyableHRegionInfo(info), sn); + } + return true; + } + }; + metaScan(conf, visitor); + return regions; + } + + /** * Visitor class called to process each row of the .META. table */ public interface MetaScannerVisitor { @@ -264,4 +315,4 @@ public class MetaScanner { */ public boolean processRow(Result rowResult) throws IOException; } -} +} \ No newline at end of file Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/RetriesExhaustedWithDetailsException.java Wed Apr 27 23:12:42 2011 @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.util.Addressing; import java.util.Collection; import java.util.HashMap; @@ -39,22 +40,23 @@ import java.util.Set; * known server addresses via {@link #getNumExceptions()} and * {@link #getCause(int)}, {@link #getRow(int)} and {@link #getAddress(int)}. */ -public class RetriesExhaustedWithDetailsException extends RetriesExhaustedException { - +@SuppressWarnings("serial") +public class RetriesExhaustedWithDetailsException +extends RetriesExhaustedException { List<Throwable> exceptions; List<Row> actions; - List<HServerAddress> addresses; + List<String> hostnameAndPort; public RetriesExhaustedWithDetailsException(List<Throwable> exceptions, List<Row> actions, - List<HServerAddress> addresses) { + List<String> hostnameAndPort) { super("Failed " + exceptions.size() + " action" + pluralize(exceptions) + ": " + - getDesc(exceptions,actions,addresses)); + getDesc(exceptions, actions, hostnameAndPort)); this.exceptions = exceptions; this.actions = actions; - this.addresses = addresses; + this.hostnameAndPort = hostnameAndPort; } public List<Throwable> getCauses() { @@ -73,8 +75,17 @@ public class RetriesExhaustedWithDetails return actions.get(i); } + /** + * @param i + * @return + * @deprecated + */ public HServerAddress getAddress(int i) { - return addresses.get(i); + return new HServerAddress(Addressing.createInetSocketAddressFromHostAndPortStr(getHostnamePort(i))); + } + + public String getHostnamePort(final int i) { + return this.hostnameAndPort.get(i); } public boolean mayHaveClusterIssues() { @@ -100,12 +111,12 @@ public class RetriesExhaustedWithDetails public static String getDesc(List<Throwable> exceptions, List<Row> actions, - List<HServerAddress> addresses) { + List<String> hostnamePort) { String s = getDesc(classifyExs(exceptions)); s += "servers with issues: "; - Set<HServerAddress> uniqAddr = new HashSet<HServerAddress>(); - uniqAddr.addAll(addresses); - for(HServerAddress addr : uniqAddr) { + Set<String> uniqAddr = new HashSet<String>(); + uniqAddr.addAll(hostnamePort); + for(String addr : uniqAddr) { s += addr + ", "; } return s; Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java Wed Apr 27 23:12:42 2011 @@ -22,8 +22,8 @@ package org.apache.hadoop.hbase.coproces import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.UnknownRegionException; import java.io.IOException; Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java Wed Apr 27 23:12:42 2011 @@ -138,23 +138,22 @@ public interface MasterObserver extends * Called prior to moving a given region from one region server to another. */ void preMove(final ObserverContext<MasterCoprocessorEnvironment> ctx, - final HRegionInfo region, final HServerInfo srcServer, - final HServerInfo destServer) + final HRegionInfo region, final ServerName srcServer, final ServerName destServer) throws UnknownRegionException; /** * Called after the region move has been requested. */ void postMove(final ObserverContext<MasterCoprocessorEnvironment> ctx, - final HRegionInfo region, final HServerInfo srcServer, - final HServerInfo destServer) + final HRegionInfo region, final ServerName srcServer, final ServerName destServer) throws UnknownRegionException; /** * Called prior to assigning a specific region. */ void preAssign(final ObserverContext<MasterCoprocessorEnvironment> ctx, - final byte [] regionName, final boolean force) throws IOException; + final byte [] regionName, final boolean force) + throws IOException; /** * Called after the region assignment has been requested. Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java Wed Apr 27 23:12:42 2011 @@ -228,4 +228,4 @@ public abstract class EventHandler imple public synchronized void setListener(EventHandlerListener listener) { this.listener = listener; } -} +} \ No newline at end of file Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java Wed Apr 27 23:12:42 2011 @@ -23,6 +23,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; @@ -42,7 +43,7 @@ public class RegionTransitionData implem private byte [] regionName; /** Server event originated from. Optional. */ - private String serverName; + private ServerName origin; /** Time the event was created. Required but automatically set. */ private long stamp; @@ -89,11 +90,11 @@ public class RegionTransitionData implem * * @param eventType type of event * @param regionName name of region as per <code>HRegionInfo#getRegionName()</code> - * @param serverName name of server setting data + * @param origin Originating {@link ServerName} */ public RegionTransitionData(EventType eventType, byte [] regionName, - String serverName) { - this(eventType, regionName, serverName, null); + final ServerName origin) { + this(eventType, regionName, origin, null); } /** @@ -107,16 +108,16 @@ public class RegionTransitionData implem * * @param eventType type of event * @param regionName name of region as per <code>HRegionInfo#getRegionName()</code> - * @param serverName name of server setting data + * @param origin Originating {@link ServerName} * @param payload Payload examples include the daughters involved in a * {@link EventType#RS_ZK_REGION_SPLIT}. Can be null */ public RegionTransitionData(EventType eventType, byte [] regionName, - String serverName, final byte [] payload) { + final ServerName serverName, final byte [] payload) { this.eventType = eventType; this.stamp = System.currentTimeMillis(); this.regionName = regionName; - this.serverName = serverName; + this.origin = serverName; this.payload = payload; } @@ -155,8 +156,8 @@ public class RegionTransitionData implem * * @return server name of originating regionserver, or null if from master */ - public String getServerName() { - return serverName; + public ServerName getOrigin() { + return origin; } /** @@ -185,10 +186,8 @@ public class RegionTransitionData implem regionName = Bytes.readByteArray(in); // remaining fields are optional so prefixed with boolean // the name of the regionserver sending the data - if(in.readBoolean()) { - serverName = in.readUTF(); - } else { - serverName = null; + if (in.readBoolean()) { + this.origin = new ServerName(in.readUTF()); } if (in.readBoolean()) { this.payload = Bytes.readByteArray(in); @@ -201,9 +200,9 @@ public class RegionTransitionData implem out.writeLong(System.currentTimeMillis()); Bytes.writeByteArray(out, regionName); // remaining fields are optional so prefixed with boolean - out.writeBoolean(serverName != null); - if(serverName != null) { - out.writeUTF(serverName); + out.writeBoolean(this.origin != null); + if(this.origin != null) { + out.writeUTF(this.origin.toString()); } out.writeBoolean(this.payload != null); if (this.payload != null) { @@ -244,7 +243,7 @@ public class RegionTransitionData implem @Override public String toString() { - return "region=" + Bytes.toString(regionName) + ", server=" + serverName + + return "region=" + Bytes.toString(regionName) + ", origin=" + this.origin + ", state=" + eventType; } } \ No newline at end of file Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed Apr 27 23:12:42 2011 @@ -41,10 +41,10 @@ import org.apache.hadoop.conf.Configured import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; @@ -148,8 +148,14 @@ public class HbaseObjectWritable impleme // Hbase types addToMap(HColumnDescriptor.class, code++); addToMap(HConstants.Modify.class, code++); - addToMap(HMsg.class, code++); - addToMap(HMsg[].class, code++); + + // We used to have a class named HMsg but its been removed. Rather than + // just axe it, use following random Integer class -- we just chose any + // class from java.lang -- instead just so codes that follow stay + // in same relative place. + addToMap(Integer.class, code++); + addToMap(Integer[].class, code++); + addToMap(HRegion.class, code++); addToMap(HRegion[].class, code++); addToMap(HRegionInfo.class, code++); @@ -225,6 +231,8 @@ public class HbaseObjectWritable impleme addToMap(CompareOp.class, code++); addToMap(ColumnRangeFilter.class, code++); + + addToMap(HServerLoad.class, code++); } private Class<?> declaredClass; Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Wed Apr 27 23:12:42 2011 @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.ipc; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -41,7 +40,13 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -52,15 +57,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.WritableWithSize; +import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.hbase.util.ByteBufferOutputStream; import com.google.common.base.Function; import com.google.common.util.concurrent.ThreadFactoryBuilder; Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Wed Apr 27 23:12:42 2011 @@ -19,22 +19,16 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hadoop.hbase.HMsg; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HServerInfo; +import java.io.IOException; + +import org.apache.hadoop.hbase.HServerLoad; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.ipc.VersionedProtocol; -import java.io.IOException; - /** - * HRegionServers interact with the HMasterRegionInterface to report on local - * goings-on and to obtain data-handling instructions from the HMaster. - * <p>Changes here need to be reflected in HbaseObjectWritable HbaseRPC#Invoker. - * - * <p>NOTE: if you change the interface, you must change the RPC version - * number in HBaseRPCProtocolVersion - * + * The Master publishes this Interface for RegionServers to register themselves + * on. */ public interface HMasterRegionInterface extends VersionedProtocol { /** @@ -44,32 +38,27 @@ public interface HMasterRegionInterface // maintained a single global version number on all HBase Interfaces. This // meant all HBase RPC was broke though only one of the three RPC Interfaces // had changed. This has since been undone. - public static final long VERSION = 28L; + public static final long VERSION = 29L; /** - * Called when a region server first starts - * @param info server info + * Called when a region server first starts. + * @param port Port number this regionserver is up on. + * @param serverStartcode This servers' startcode. * @param serverCurrentTime The current time of the region server in ms * @throws IOException e * @return Configuration for the regionserver to use: e.g. filesystem, - * hbase rootdir, etc. + * hbase rootdir, the hostname to use creating the RegionServer ServerName, + * etc. */ - public MapWritable regionServerStartup(HServerInfo info, - long serverCurrentTime) throws IOException; + public MapWritable regionServerStartup(final int port, + final long serverStartcode, final long serverCurrentTime) + throws IOException; /** - * Called to renew lease, tell master what the region server is doing and to - * receive new instructions from the master - * - * @param info server's address and start code - * @param msgs things the region server wants to tell the master - * @param mostLoadedRegions Array of HRegionInfos that should contain the - * reporting server's most loaded regions. These are candidates for being - * rebalanced. - * @return instructions from the master to the region server - * @throws IOException e + * @param sn {@link ServerName#getBytes()} + * @param hsl Server load. + * @throws IOException */ - public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[], - HRegionInfo mostLoadedRegions[]) + public void regionServerReport(byte [] sn, HServerLoad hsl) throws IOException; } \ No newline at end of file Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Apr 27 23:12:42 2011 @@ -292,8 +292,10 @@ public interface HRegionInterface extend /** * Method used when a master is taking the place of another failed one. - * @return The HSI + * @return This servers {@link HServerInfo}; it has RegionServer POV on the + * hostname which may not agree w/ how the Master sees this server. * @throws IOException e + * @deprecated */ public HServerInfo getHServerInfo() throws IOException; Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Wed Apr 27 23:12:42 2011 @@ -140,8 +140,9 @@ class WritableRpcEngine implements RpcEn client.call(new Invocation(method, args), address, protocol, ticket, rpcTimeout); if (logDebug) { - long callTime = System.currentTimeMillis() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); + // FIGURE HOW TO TURN THIS OFF! + // long callTime = System.currentTimeMillis() - startTime; + // LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=1097275&r1=1097274&r2=1097275&view=diff ============================================================================== --- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (original) +++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Wed Apr 27 23:12:42 2011 @@ -23,8 +23,9 @@ import java.util.concurrent.atomic.Atomi import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -48,13 +49,17 @@ class ActiveMasterManager extends ZooKee final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false); - private final HServerAddress address; + private final ServerName sn; private final Server master; - ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address, - Server master) { + /** + * @param watcher + * @param sn ServerName + * @param master In an instance of a Master. + */ + ActiveMasterManager(ZooKeeperWatcher watcher, ServerName sn, Server master) { super(watcher); - this.address = address; + this.sn = sn; this.master = master; } @@ -122,11 +127,11 @@ class ActiveMasterManager extends ZooKee boolean cleanSetOfActiveMaster = true; // Try to become the active master, watch if there is another master try { - if (ZKUtil.setAddressAndWatch(this.watcher, - this.watcher.masterAddressZNode, this.address)) { + if (ZKUtil.createEphemeralNodeAndWatch(this.watcher, + this.watcher.masterAddressZNode, Bytes.toBytes(this.sn.toString()))) { // We are the master, return this.clusterHasActiveMaster.set(true); - LOG.info("Master=" + this.address); + LOG.info("Master=" + this.sn); return cleanSetOfActiveMaster; } cleanSetOfActiveMaster = false; @@ -134,9 +139,10 @@ class ActiveMasterManager extends ZooKee // There is another active master running elsewhere or this is a restart // and the master ephemeral node has not expired yet. this.clusterHasActiveMaster.set(true); - HServerAddress currentMaster = - ZKUtil.getDataAsAddress(this.watcher, this.watcher.masterAddressZNode); - if (currentMaster != null && currentMaster.equals(this.address)) { + byte [] bytes = + ZKUtil.getDataAndWatch(this.watcher, this.watcher.masterAddressZNode); + ServerName currentMaster = new ServerName(Bytes.toString(bytes)); + if (currentMaster != null && currentMaster.equals(this.sn)) { LOG.info("Current master has this master's address, " + currentMaster + "; master was restarted? Waiting on znode to expire..."); // Hurry along the expiration of the znode. @@ -177,11 +183,11 @@ class ActiveMasterManager extends ZooKee public void stop() { try { // If our address is in ZK, delete it on our way out - HServerAddress zkAddress = - ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode); + byte [] bytes = + ZKUtil.getDataAndWatch(watcher, watcher.masterAddressZNode); // TODO: redo this to make it atomic (only added for tests) - if(zkAddress != null && - zkAddress.equals(address)) { + ServerName master = new ServerName(Bytes.toString(bytes)); + if(master != null && master.equals(this.sn)) { ZKUtil.deleteNode(watcher, watcher.masterAddressZNode); } } catch (KeeperException e) {
