http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java new file mode 100644 index 0000000..f6c7a2d --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java @@ -0,0 +1,629 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.EOFException; +import java.io.IOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.ipc.FailedServerException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.MetaRegionServer; + +/** + * Utility class to perform operation (get/wait for/verify/set/delete) on znode in ZooKeeper + * which keeps hbase:meta region server location. + * + * Stateless class with a bunch of static methods. Doesn't manage resources passed in + * (e.g. Connection, ZKWatcher etc). + * + * Meta region location is set by <code>RegionServerServices</code>. + * This class doesn't use ZK watchers, rather accesses ZK directly. + * + * This class it stateless. The only reason it's not made a non-instantiable util class + * with a collection of static methods is that it'd be rather hard to mock properly in tests. + * + * TODO: rewrite using RPC calls to master to find out about hbase:meta. + */ +@InterfaceAudience.Private +public class MetaTableLocator { + private static final Log LOG = LogFactory.getLog(MetaTableLocator.class); + + // only needed to allow non-timeout infinite waits to stop when cluster shuts down + private volatile boolean stopped = false; + + /** + * Checks if the meta region location is available. + * @return true if meta region location is available, false if not + */ + public boolean isLocationAvailable(ZKWatcher zkw) { + return getMetaRegionLocation(zkw) != null; + } + + /** + * @param zkw ZooKeeper watcher to be used + * @return meta table regions and their locations. + */ + public List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw) { + return getMetaRegionsAndLocations(zkw, RegionInfo.DEFAULT_REPLICA_ID); + } + + /** + * + * @param zkw + * @param replicaId + * @return meta table regions and their locations. + */ + public List<Pair<RegionInfo, ServerName>> getMetaRegionsAndLocations(ZKWatcher zkw, + int replicaId) { + ServerName serverName = getMetaRegionLocation(zkw, replicaId); + List<Pair<RegionInfo, ServerName>> list = new ArrayList<>(1); + list.add(new Pair<>(RegionReplicaUtil.getRegionInfoForReplica( + RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), serverName)); + return list; + } + + /** + * @param zkw ZooKeeper watcher to be used + * @return List of meta regions + */ + public List<RegionInfo> getMetaRegions(ZKWatcher zkw) { + return getMetaRegions(zkw, RegionInfo.DEFAULT_REPLICA_ID); + } + + /** + * + * @param zkw + * @param replicaId + * @return List of meta regions + */ + public List<RegionInfo> getMetaRegions(ZKWatcher zkw, int replicaId) { + List<Pair<RegionInfo, ServerName>> result; + result = getMetaRegionsAndLocations(zkw, replicaId); + return getListOfRegionInfos(result); + } + + private List<RegionInfo> getListOfRegionInfos( + final List<Pair<RegionInfo, ServerName>> pairs) { + if (pairs == null || pairs.isEmpty()) return Collections.EMPTY_LIST; + List<RegionInfo> result = new ArrayList<>(pairs.size()); + for (Pair<RegionInfo, ServerName> pair: pairs) { + result.add(pair.getFirst()); + } + return result; + } + + /** + * Gets the meta region location, if available. Does not block. + * @param zkw zookeeper connection to use + * @return server name or null if we failed to get the data. + */ + public ServerName getMetaRegionLocation(final ZKWatcher zkw) { + try { + RegionState state = getMetaRegionState(zkw); + return state.isOpened() ? state.getServerName() : null; + } catch (KeeperException ke) { + return null; + } + } + + /** + * Gets the meta region location, if available. Does not block. + * @param zkw + * @param replicaId + * @return server name + */ + public ServerName getMetaRegionLocation(final ZKWatcher zkw, int replicaId) { + try { + RegionState state = getMetaRegionState(zkw, replicaId); + return state.isOpened() ? state.getServerName() : null; + } catch (KeeperException ke) { + return null; + } + } + + /** + * Gets the meta region location, if available, and waits for up to the + * specified timeout if not immediately available. + * Given the zookeeper notification could be delayed, we will try to + * get the latest data. + * @param zkw + * @param timeout maximum time to wait, in millis + * @return server name for server hosting meta region formatted as per + * {@link ServerName}, or null if none available + * @throws InterruptedException if interrupted while waiting + * @throws NotAllMetaRegionsOnlineException + */ + public ServerName waitMetaRegionLocation(ZKWatcher zkw, long timeout) + throws InterruptedException, NotAllMetaRegionsOnlineException { + return waitMetaRegionLocation(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout); + } + + /** + * Gets the meta region location, if available, and waits for up to the + * specified timeout if not immediately available. + * Given the zookeeper notification could be delayed, we will try to + * get the latest data. + * @param zkw + * @param replicaId + * @param timeout maximum time to wait, in millis + * @return server name for server hosting meta region formatted as per + * {@link ServerName}, or null if none available + * @throws InterruptedException + * @throws NotAllMetaRegionsOnlineException + */ + public ServerName waitMetaRegionLocation(ZKWatcher zkw, int replicaId, long timeout) + throws InterruptedException, NotAllMetaRegionsOnlineException { + try { + if (ZKUtil.checkExists(zkw, zkw.znodePaths.baseZNode) == -1) { + String errorMsg = "Check the value configured in 'zookeeper.znode.parent'. " + + "There could be a mismatch with the one configured in the master."; + LOG.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + } catch (KeeperException e) { + throw new IllegalStateException("KeeperException while trying to check baseZNode:", e); + } + ServerName sn = blockUntilAvailable(zkw, replicaId, timeout); + + if (sn == null) { + throw new NotAllMetaRegionsOnlineException("Timed out; " + timeout + "ms"); + } + + return sn; + } + + /** + * Waits indefinitely for availability of <code>hbase:meta</code>. Used during + * cluster startup. Does not verify meta, just that something has been + * set up in zk. + * @see #waitMetaRegionLocation(ZKWatcher, long) + * @throws InterruptedException if interrupted while waiting + */ + public void waitMetaRegionLocation(ZKWatcher zkw) throws InterruptedException { + long startTime = System.currentTimeMillis(); + while (!stopped) { + try { + if (waitMetaRegionLocation(zkw, 100) != null) break; + long sleepTime = System.currentTimeMillis() - startTime; + // +1 in case sleepTime=0 + if ((sleepTime + 1) % 10000 == 0) { + LOG.warn("Have been waiting for meta to be assigned for " + sleepTime + "ms"); + } + } catch (NotAllMetaRegionsOnlineException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("hbase:meta still not available, sleeping and retrying." + + " Reason: " + e.getMessage()); + } + } + } + } + + /** + * Verify <code>hbase:meta</code> is deployed and accessible. + * @param hConnection + * @param zkw + * @param timeout How long to wait on zk for meta address (passed through to + * the internal call to {@link #getMetaServerConnection}. + * @return True if the <code>hbase:meta</code> location is healthy. + * @throws java.io.IOException + * @throws InterruptedException + */ + public boolean verifyMetaRegionLocation(ClusterConnection hConnection, + ZKWatcher zkw, final long timeout) + throws InterruptedException, IOException { + return verifyMetaRegionLocation(hConnection, zkw, timeout, RegionInfo.DEFAULT_REPLICA_ID); + } + + /** + * Verify <code>hbase:meta</code> is deployed and accessible. + * @param connection + * @param zkw + * @param timeout How long to wait on zk for meta address (passed through to + * @param replicaId + * @return True if the <code>hbase:meta</code> location is healthy. + * @throws InterruptedException + * @throws IOException + */ + public boolean verifyMetaRegionLocation(ClusterConnection connection, + ZKWatcher zkw, final long timeout, int replicaId) + throws InterruptedException, IOException { + AdminProtos.AdminService.BlockingInterface service = null; + try { + service = getMetaServerConnection(connection, zkw, timeout, replicaId); + } catch (NotAllMetaRegionsOnlineException e) { + // Pass + } catch (ServerNotRunningYetException e) { + // Pass -- remote server is not up so can't be carrying root + } catch (UnknownHostException e) { + // Pass -- server name doesn't resolve so it can't be assigned anything. + } catch (RegionServerStoppedException e) { + // Pass -- server name sends us to a server that is dying or already dead. + } + return (service != null) && verifyRegionLocation(connection, service, + getMetaRegionLocation(zkw, replicaId), RegionReplicaUtil.getRegionInfoForReplica( + RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId).getRegionName()); + } + + /** + * Verify we can connect to <code>hostingServer</code> and that its carrying + * <code>regionName</code>. + * @param hostingServer Interface to the server hosting <code>regionName</code> + * @param address The servername that goes with the <code>metaServer</code> + * Interface. Used logging. + * @param regionName The regionname we are interested in. + * @return True if we were able to verify the region located at other side of + * the Interface. + * @throws IOException + */ + // TODO: We should be able to get the ServerName from the AdminProtocol + // rather than have to pass it in. Its made awkward by the fact that the + // HRI is likely a proxy against remote server so the getServerName needs + // to be fixed to go to a local method or to a cache before we can do this. + private boolean verifyRegionLocation(final ClusterConnection connection, + AdminService.BlockingInterface hostingServer, final ServerName address, + final byte [] regionName) + throws IOException { + if (hostingServer == null) { + LOG.info("Passed hostingServer is null"); + return false; + } + Throwable t; + HBaseRpcController controller = connection.getRpcControllerFactory().newController(); + try { + // Try and get regioninfo from the hosting server. + return ProtobufUtil.getRegionInfo(controller, hostingServer, regionName) != null; + } catch (ConnectException e) { + t = e; + } catch (RetriesExhaustedException e) { + t = e; + } catch (RemoteException e) { + IOException ioe = e.unwrapRemoteException(); + t = ioe; + } catch (IOException e) { + Throwable cause = e.getCause(); + if (cause != null && cause instanceof EOFException) { + t = cause; + } else if (cause != null && cause.getMessage() != null + && cause.getMessage().contains("Connection reset")) { + t = cause; + } else { + t = e; + } + } + LOG.info("Failed verification of " + Bytes.toStringBinary(regionName) + + " at address=" + address + ", exception=" + t.getMessage()); + return false; + } + + /** + * Gets a connection to the server hosting meta, as reported by ZooKeeper, + * waiting up to the specified timeout for availability. + * <p>WARNING: Does not retry. Use an {@link org.apache.hadoop.hbase.client.HTable} instead. + * @param connection + * @param zkw + * @param timeout How long to wait on meta location + * @param replicaId + * @return connection to server hosting meta + * @throws InterruptedException + * @throws NotAllMetaRegionsOnlineException if timed out waiting + * @throws IOException + */ + private AdminService.BlockingInterface getMetaServerConnection(ClusterConnection connection, + ZKWatcher zkw, long timeout, int replicaId) + throws InterruptedException, NotAllMetaRegionsOnlineException, IOException { + return getCachedConnection(connection, waitMetaRegionLocation(zkw, replicaId, timeout)); + } + + /** + * @param sn ServerName to get a connection against. + * @return The AdminProtocol we got when we connected to <code>sn</code> + * May have come from cache, may not be good, may have been setup by this + * invocation, or may be null. + * @throws IOException + */ + private static AdminService.BlockingInterface getCachedConnection(ClusterConnection connection, + ServerName sn) + throws IOException { + if (sn == null) { + return null; + } + AdminService.BlockingInterface service = null; + try { + service = connection.getAdmin(sn); + } catch (RetriesExhaustedException e) { + if (e.getCause() != null && e.getCause() instanceof ConnectException) { + // Catch this; presume it means the cached connection has gone bad. + } else { + throw e; + } + } catch (SocketTimeoutException e) { + LOG.debug("Timed out connecting to " + sn); + } catch (NoRouteToHostException e) { + LOG.debug("Connecting to " + sn, e); + } catch (SocketException e) { + LOG.debug("Exception connecting to " + sn); + } catch (UnknownHostException e) { + LOG.debug("Unknown host exception connecting to " + sn); + } catch (FailedServerException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Server " + sn + " is in failed server list."); + } + } catch (IOException ioe) { + Throwable cause = ioe.getCause(); + if (ioe instanceof ConnectException) { + // Catch. Connect refused. + } else if (cause != null && cause instanceof EOFException) { + // Catch. Other end disconnected us. + } else if (cause != null && cause.getMessage() != null && + cause.getMessage().toLowerCase(Locale.ROOT).contains("connection reset")) { + // Catch. Connection reset. + } else { + throw ioe; + } + + } + return service; + } + + /** + * Sets the location of <code>hbase:meta</code> in ZooKeeper to the + * specified server address. + * @param zookeeper zookeeper reference + * @param serverName The server hosting <code>hbase:meta</code> + * @param state The region transition state + * @throws KeeperException unexpected zookeeper exception + */ + public static void setMetaLocation(ZKWatcher zookeeper, + ServerName serverName, RegionState.State state) throws KeeperException { + setMetaLocation(zookeeper, serverName, RegionInfo.DEFAULT_REPLICA_ID, state); + } + + /** + * Sets the location of <code>hbase:meta</code> in ZooKeeper to the + * specified server address. + * @param zookeeper + * @param serverName + * @param replicaId + * @param state + * @throws KeeperException + */ + public static void setMetaLocation(ZKWatcher zookeeper, + ServerName serverName, int replicaId, RegionState.State state) throws KeeperException { + if (serverName == null) { + LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required"); + return; + } + LOG.info("Setting hbase:meta (replicaId=" + replicaId + ") location in ZooKeeper as " + + serverName); + // Make the MetaRegionServer pb and then get its bytes and save this as + // the znode content. + MetaRegionServer pbrsr = MetaRegionServer.newBuilder() + .setServer(ProtobufUtil.toServerName(serverName)) + .setRpcVersion(HConstants.RPC_CURRENT_VERSION) + .setState(state.convert()).build(); + byte[] data = ProtobufUtil.prependPBMagic(pbrsr.toByteArray()); + try { + ZKUtil.setData(zookeeper, + zookeeper.znodePaths.getZNodeForReplica(replicaId), data); + } catch(KeeperException.NoNodeException nne) { + if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) { + LOG.debug("META region location doesn't exist, create it"); + } else { + LOG.debug("META region location doesn't exist for replicaId=" + replicaId + + ", create it"); + } + ZKUtil.createAndWatch(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId), data); + } + } + + /** + * Load the meta region state from the meta server ZNode. + */ + public static RegionState getMetaRegionState(ZKWatcher zkw) throws KeeperException { + return getMetaRegionState(zkw, RegionInfo.DEFAULT_REPLICA_ID); + } + + /** + * Load the meta region state from the meta server ZNode. + * @param zkw + * @param replicaId + * @return regionstate + * @throws KeeperException + */ + public static RegionState getMetaRegionState(ZKWatcher zkw, int replicaId) + throws KeeperException { + RegionState.State state = RegionState.State.OPEN; + ServerName serverName = null; + try { + byte[] data = ZKUtil.getData(zkw, zkw.znodePaths.getZNodeForReplica(replicaId)); + if (data != null && data.length > 0 && ProtobufUtil.isPBMagicPrefix(data)) { + try { + int prefixLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.MetaRegionServer rl = + ZooKeeperProtos.MetaRegionServer.PARSER.parseFrom + (data, prefixLen, data.length - prefixLen); + if (rl.hasState()) { + state = RegionState.State.convert(rl.getState()); + } + HBaseProtos.ServerName sn = rl.getServer(); + serverName = ServerName.valueOf( + sn.getHostName(), sn.getPort(), sn.getStartCode()); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException("Unable to parse meta region location"); + } + } else { + // old style of meta region location? + serverName = ProtobufUtil.parseServerNameFrom(data); + } + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + if (serverName == null) { + state = RegionState.State.OFFLINE; + } + return new RegionState( + RegionReplicaUtil.getRegionInfoForReplica(RegionInfoBuilder.FIRST_META_REGIONINFO, replicaId), + state, serverName); + } + + /** + * Deletes the location of <code>hbase:meta</code> in ZooKeeper. + * @param zookeeper zookeeper reference + * @throws KeeperException unexpected zookeeper exception + */ + public void deleteMetaLocation(ZKWatcher zookeeper) + throws KeeperException { + deleteMetaLocation(zookeeper, RegionInfo.DEFAULT_REPLICA_ID); + } + + public void deleteMetaLocation(ZKWatcher zookeeper, int replicaId) + throws KeeperException { + if (replicaId == RegionInfo.DEFAULT_REPLICA_ID) { + LOG.info("Deleting hbase:meta region location in ZooKeeper"); + } else { + LOG.info("Deleting hbase:meta for " + replicaId + " region location in ZooKeeper"); + } + try { + // Just delete the node. Don't need any watches. + ZKUtil.deleteNode(zookeeper, zookeeper.znodePaths.getZNodeForReplica(replicaId)); + } catch(KeeperException.NoNodeException nne) { + // Has already been deleted + } + } + /** + * Wait until the primary meta region is available. Get the secondary + * locations as well but don't block for those. + * @param zkw + * @param timeout + * @param conf + * @return ServerName or null if we timed out. + * @throws InterruptedException + */ + public List<ServerName> blockUntilAvailable(final ZKWatcher zkw, + final long timeout, Configuration conf) + throws InterruptedException { + int numReplicasConfigured = 1; + + List<ServerName> servers = new ArrayList<>(); + // Make the blocking call first so that we do the wait to know + // the znodes are all in place or timeout. + ServerName server = blockUntilAvailable(zkw, timeout); + if (server == null) return null; + servers.add(server); + + try { + List<String> metaReplicaNodes = zkw.getMetaReplicaNodes(); + numReplicasConfigured = metaReplicaNodes.size(); + } catch (KeeperException e) { + LOG.warn("Got ZK exception " + e); + } + for (int replicaId = 1; replicaId < numReplicasConfigured; replicaId++) { + // return all replica locations for the meta + servers.add(getMetaRegionLocation(zkw, replicaId)); + } + return servers; + } + + /** + * Wait until the meta region is available and is not in transition. + * @param zkw zookeeper connection to use + * @param timeout maximum time to wait, in millis + * @return ServerName or null if we timed out. + * @throws InterruptedException + */ + public ServerName blockUntilAvailable(final ZKWatcher zkw, + final long timeout) + throws InterruptedException { + return blockUntilAvailable(zkw, RegionInfo.DEFAULT_REPLICA_ID, timeout); + } + + /** + * Wait until the meta region is available and is not in transition. + * @param zkw + * @param replicaId + * @param timeout + * @return ServerName or null if we timed out. + * @throws InterruptedException + */ + public ServerName blockUntilAvailable(final ZKWatcher zkw, int replicaId, + final long timeout) + throws InterruptedException { + if (timeout < 0) throw new IllegalArgumentException(); + if (zkw == null) throw new IllegalArgumentException(); + long startTime = System.currentTimeMillis(); + ServerName sn = null; + while (true) { + sn = getMetaRegionLocation(zkw, replicaId); + if (sn != null || (System.currentTimeMillis() - startTime) + > timeout - HConstants.SOCKET_RETRY_WAIT_MS) { + break; + } + Thread.sleep(HConstants.SOCKET_RETRY_WAIT_MS); + } + return sn; + } + + /** + * Stop working. + * Interrupts any ongoing waits. + */ + public void stop() { + if (!stopped) { + LOG.debug("Stopping MetaTableLocator"); + stopped = true; + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java new file mode 100644 index 0000000..ef643bf --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java @@ -0,0 +1,472 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InterruptedIOException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnLog; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +/** + * TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead + * of redoing it, we should contribute updates to their code which let us more + * easily access testing helper objects. + */ +@InterfaceAudience.Public +public class MiniZooKeeperCluster { + private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class); + + private static final int TICK_TIME = 2000; + private static final int DEFAULT_CONNECTION_TIMEOUT = 30000; + private int connectionTimeout; + + private boolean started; + + /** The default port. If zero, we use a random port. */ + private int defaultClientPort = 0; + + private List<NIOServerCnxnFactory> standaloneServerFactoryList; + private List<ZooKeeperServer> zooKeeperServers; + private List<Integer> clientPortList; + + private int activeZKServerIndex; + private int tickTime = 0; + + private Configuration configuration; + + public MiniZooKeeperCluster() { + this(new Configuration()); + } + + public MiniZooKeeperCluster(Configuration configuration) { + this.started = false; + this.configuration = configuration; + activeZKServerIndex = -1; + zooKeeperServers = new ArrayList<>(); + clientPortList = new ArrayList<>(); + standaloneServerFactoryList = new ArrayList<>(); + connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", + DEFAULT_CONNECTION_TIMEOUT); + } + + /** + * Add a client port to the list. + * + * @param clientPort the specified port + */ + public void addClientPort(int clientPort) { + clientPortList.add(clientPort); + } + + /** + * Get the list of client ports. + * @return clientPortList the client port list + */ + @VisibleForTesting + public List<Integer> getClientPortList() { + return clientPortList; + } + + /** + * Check whether the client port in a specific position of the client port list is valid. + * + * @param index the specified position + */ + private boolean hasValidClientPortInList(int index) { + return (clientPortList.size() > index && clientPortList.get(index) > 0); + } + + public void setDefaultClientPort(int clientPort) { + if (clientPort <= 0) { + throw new IllegalArgumentException("Invalid default ZK client port: " + + clientPort); + } + this.defaultClientPort = clientPort; + } + + /** + * Selects a ZK client port. + * + * @param seedPort the seed port to start with; -1 means first time. + * @Returns a valid and unused client port + */ + private int selectClientPort(int seedPort) { + int i; + int returnClientPort = seedPort + 1; + if (returnClientPort == 0) { + // If the new port is invalid, find one - starting with the default client port. + // If the default client port is not specified, starting with a random port. + // The random port is selected from the range between 49152 to 65535. These ports cannot be + // registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports). + if (defaultClientPort > 0) { + returnClientPort = defaultClientPort; + } else { + returnClientPort = 0xc000 + new Random().nextInt(0x3f00); + } + } + // Make sure that the port is unused. + while (true) { + for (i = 0; i < clientPortList.size(); i++) { + if (returnClientPort == clientPortList.get(i)) { + // Already used. Update the port and retry. + returnClientPort++; + break; + } + } + if (i == clientPortList.size()) { + break; // found a unused port, exit + } + } + return returnClientPort; + } + + public void setTickTime(int tickTime) { + this.tickTime = tickTime; + } + + public int getBackupZooKeeperServerNum() { + return zooKeeperServers.size()-1; + } + + public int getZooKeeperServerNum() { + return zooKeeperServers.size(); + } + + // / XXX: From o.a.zk.t.ClientBase + private static void setupTestEnv() { + // during the tests we run with 100K prealloc in the logs. + // on windows systems prealloc of 64M was seen to take ~15seconds + // resulting in test failure (client timeout on first session). + // set env and directly in order to handle static init/gc issues + System.setProperty("zookeeper.preAllocSize", "100"); + FileTxnLog.setPreallocSize(100 * 1024); + } + + public int startup(File baseDir) throws IOException, InterruptedException { + int numZooKeeperServers = clientPortList.size(); + if (numZooKeeperServers == 0) { + numZooKeeperServers = 1; // need at least 1 ZK server for testing + } + return startup(baseDir, numZooKeeperServers); + } + + /** + * @param baseDir + * @param numZooKeeperServers + * @return ClientPort server bound to, -1 if there was a + * binding problem and we couldn't pick another port. + * @throws IOException + * @throws InterruptedException + */ + public int startup(File baseDir, int numZooKeeperServers) throws IOException, + InterruptedException { + if (numZooKeeperServers <= 0) + return -1; + + setupTestEnv(); + shutdown(); + + int tentativePort = -1; // the seed port + int currentClientPort; + + // running all the ZK servers + for (int i = 0; i < numZooKeeperServers; i++) { + File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile(); + createDir(dir); + int tickTimeToUse; + if (this.tickTime > 0) { + tickTimeToUse = this.tickTime; + } else { + tickTimeToUse = TICK_TIME; + } + + // Set up client port - if we have already had a list of valid ports, use it. + if (hasValidClientPortInList(i)) { + currentClientPort = clientPortList.get(i); + } else { + tentativePort = selectClientPort(tentativePort); // update the seed + currentClientPort = tentativePort; + } + + ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse); + // Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper + server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout", -1)); + server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout", -1)); + NIOServerCnxnFactory standaloneServerFactory; + while (true) { + try { + standaloneServerFactory = new NIOServerCnxnFactory(); + standaloneServerFactory.configure( + new InetSocketAddress(currentClientPort), + configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS)); + } catch (BindException e) { + LOG.debug("Failed binding ZK Server to client port: " + + currentClientPort, e); + // We're told to use some port but it's occupied, fail + if (hasValidClientPortInList(i)) { + return -1; + } + // This port is already in use, try to use another. + tentativePort = selectClientPort(tentativePort); + currentClientPort = tentativePort; + continue; + } + break; + } + + // Start up this ZK server + standaloneServerFactory.startup(server); + // Runs a 'stat' against the servers. + if (!waitForServerUp(currentClientPort, connectionTimeout)) { + throw new IOException("Waiting for startup of standalone server"); + } + + // We have selected a port as a client port. Update clientPortList if necessary. + if (clientPortList.size() <= i) { // it is not in the list, add the port + clientPortList.add(currentClientPort); + } + else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port + clientPortList.remove(i); + clientPortList.add(i, currentClientPort); + } + + standaloneServerFactoryList.add(standaloneServerFactory); + zooKeeperServers.add(server); + } + + // set the first one to be active ZK; Others are backups + activeZKServerIndex = 0; + started = true; + int clientPort = clientPortList.get(activeZKServerIndex); + LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " + + "on client port=" + clientPort); + return clientPort; + } + + private void createDir(File dir) throws IOException { + try { + if (!dir.exists()) { + dir.mkdirs(); + } + } catch (SecurityException e) { + throw new IOException("creating dir: " + dir, e); + } + } + + /** + * @throws IOException + */ + public void shutdown() throws IOException { + // shut down all the zk servers + for (int i = 0; i < standaloneServerFactoryList.size(); i++) { + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(i); + int clientPort = clientPortList.get(i); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, connectionTimeout)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + } + standaloneServerFactoryList.clear(); + + for (ZooKeeperServer zkServer: zooKeeperServers) { + //explicitly close ZKDatabase since ZookeeperServer does not close them + zkServer.getZKDatabase().close(); + } + zooKeeperServers.clear(); + + // clear everything + if (started) { + started = false; + activeZKServerIndex = 0; + clientPortList.clear(); + LOG.info("Shutdown MiniZK cluster with all ZK servers"); + } + } + + /**@return clientPort return clientPort if there is another ZK backup can run + * when killing the current active; return -1, if there is no backups. + * @throws IOException + * @throws InterruptedException + */ + public int killCurrentActiveZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0) { + return -1; + } + + // Shutdown the current active one + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(activeZKServerIndex); + int clientPort = clientPortList.get(activeZKServerIndex); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, connectionTimeout)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close(); + + // remove the current active zk server + standaloneServerFactoryList.remove(activeZKServerIndex); + clientPortList.remove(activeZKServerIndex); + zooKeeperServers.remove(activeZKServerIndex); + LOG.info("Kill the current active ZK servers in the cluster " + + "on client port: " + clientPort); + + if (standaloneServerFactoryList.isEmpty()) { + // there is no backup servers; + return -1; + } + clientPort = clientPortList.get(activeZKServerIndex); + LOG.info("Activate a backup zk server in the cluster " + + "on client port: " + clientPort); + // return the next back zk server's port + return clientPort; + } + + /** + * Kill one back up ZK servers + * @throws IOException + * @throws InterruptedException + */ + public void killOneBackupZooKeeperServer() throws IOException, + InterruptedException { + if (!started || activeZKServerIndex < 0 || + standaloneServerFactoryList.size() <= 1) { + return ; + } + + int backupZKServerIndex = activeZKServerIndex+1; + // Shutdown the current active one + NIOServerCnxnFactory standaloneServerFactory = + standaloneServerFactoryList.get(backupZKServerIndex); + int clientPort = clientPortList.get(backupZKServerIndex); + + standaloneServerFactory.shutdown(); + if (!waitForServerDown(clientPort, connectionTimeout)) { + throw new IOException("Waiting for shutdown of standalone server"); + } + + zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close(); + + // remove this backup zk server + standaloneServerFactoryList.remove(backupZKServerIndex); + clientPortList.remove(backupZKServerIndex); + zooKeeperServers.remove(backupZKServerIndex); + LOG.info("Kill one backup ZK servers in the cluster " + + "on client port: " + clientPort); + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerDown(int port, long timeout) throws IOException { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket("localhost", port); + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + } finally { + sock.close(); + } + } catch (IOException e) { + return true; + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + } + return false; + } + + // XXX: From o.a.zk.t.ClientBase + private static boolean waitForServerUp(int port, long timeout) throws IOException { + long start = System.currentTimeMillis(); + while (true) { + try { + Socket sock = new Socket("localhost", port); + BufferedReader reader = null; + try { + OutputStream outstream = sock.getOutputStream(); + outstream.write("stat".getBytes()); + outstream.flush(); + + Reader isr = new InputStreamReader(sock.getInputStream()); + reader = new BufferedReader(isr); + String line = reader.readLine(); + if (line != null && line.startsWith("Zookeeper version:")) { + return true; + } + } finally { + sock.close(); + if (reader != null) { + reader.close(); + } + } + } catch (IOException e) { + // ignore as this is expected + LOG.info("server localhost:" + port + " not up " + e); + } + + if (System.currentTimeMillis() > start + timeout) { + break; + } + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + } + return false; + } + + public int getClientPort() { + return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1 + : clientPortList.get(activeZKServerIndex); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java new file mode 100644 index 0000000..da7d176 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/PendingWatcher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +/** + * Placeholder of a watcher which might be triggered before the instance is not yet created. + * <p> + * {@code ZooKeeper} starts its event thread within its constructor (and that is an anti-pattern), + * and the watcher passed to the constructor might be called back by the event thread + * before you get the instance of {@code ZooKeeper} from the constructor. + * If your watcher calls methods of {@code ZooKeeper}, + * pass this placeholder to the constructor of the {@code ZooKeeper}, + * create your watcher using the instance of {@code ZooKeeper}, + * and then call the method {@code PendingWatcher.prepare}. + */ +class PendingWatcher implements Watcher { + private final InstancePending<Watcher> pending = new InstancePending<>(); + + @Override + public void process(WatchedEvent event) { + pending.get().process(event); + } + + /** + * Associates the substantial watcher of processing events. + * This method should be called once, and {@code watcher} should be non-null. + * This method is expected to call as soon as possible + * because the event processing, being invoked by the ZooKeeper event thread, + * is uninterruptibly blocked until this method is called. + */ + void prepare(Watcher watcher) { + pending.prepare(watcher); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java new file mode 100644 index 0000000..d6c11af --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -0,0 +1,810 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.htrace.core.TraceScope; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.proto.SetDataRequest; + +/** + * A zookeeper that can handle 'recoverable' errors. + * To handle recoverable errors, developers need to realize that there are two + * classes of requests: idempotent and non-idempotent requests. Read requests + * and unconditional sets and deletes are examples of idempotent requests, they + * can be reissued with the same results. + * (Although, the delete may throw a NoNodeException on reissue its effect on + * the ZooKeeper state is the same.) Non-idempotent requests need special + * handling, application and library writers need to keep in mind that they may + * need to encode information in the data or name of znodes to detect + * retries. A simple example is a create that uses a sequence flag. + * If a process issues a create("/x-", ..., SEQUENCE) and gets a connection + * loss exception, that process will reissue another + * create("/x-", ..., SEQUENCE) and get back x-111. When the process does a + * getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be + * that x-109 was the result of the previous create, so the process actually + * owns both x-109 and x-111. An easy way around this is to use "x-process id-" + * when doing the create. If the process is using an id of 352, before reissuing + * the create it will do a getChildren("/") and see "x-222-1", "x-542-30", + * "x-352-109", x-333-110". The process will know that the original create + * succeeded an the znode it created is "x-352-109". + * @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling" + */ +@InterfaceAudience.Private +public class RecoverableZooKeeper { + private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class); + // the actual ZooKeeper client instance + private ZooKeeper zk; + private final RetryCounterFactory retryCounterFactory; + // An identifier of this process in the cluster + private final String identifier; + private final byte[] id; + private Watcher watcher; + private int sessionTimeout; + private String quorumServers; + private final ZKMetricsListener metrics; + + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, + Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime) + throws IOException { + this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime, + null); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE", + justification="None. Its always been this way.") + public RecoverableZooKeeper(String quorumServers, int sessionTimeout, + Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier) + throws IOException { + // TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should. + this.retryCounterFactory = + new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime); + + if (identifier == null || identifier.length() == 0) { + // the identifier = processID@hostName + identifier = ManagementFactory.getRuntimeMXBean().getName(); + } + LOG.info("Process identifier=" + identifier + + " connecting to ZooKeeper ensemble=" + quorumServers); + this.identifier = identifier; + this.id = Bytes.toBytes(identifier); + + this.watcher = watcher; + this.sessionTimeout = sessionTimeout; + this.quorumServers = quorumServers; + this.metrics = new ZKMetrics(); + try {checkZk();} catch (Exception x) {/* ignore */} + } + + /** + * Try to create a ZooKeeper connection. Turns any exception encountered into a + * KeeperException.OperationTimeoutException so it can retried. + * @return The created ZooKeeper connection object + * @throws KeeperException + */ + protected synchronized ZooKeeper checkZk() throws KeeperException { + if (this.zk == null) { + try { + this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher); + } catch (IOException ex) { + LOG.warn("Unable to create ZooKeeper Connection", ex); + throw new KeeperException.OperationTimeoutException(); + } + } + return zk; + } + + public synchronized void reconnectAfterExpiration() + throws IOException, KeeperException, InterruptedException { + if (zk != null) { + LOG.info("Closing dead ZooKeeper connection, session" + + " was: 0x"+Long.toHexString(zk.getSessionId())); + zk.close(); + // reset the ZooKeeper connection + zk = null; + } + checkZk(); + LOG.info("Recreated a ZooKeeper, session" + + " is: 0x"+Long.toHexString(zk.getSessionId())); + } + + /** + * delete is an idempotent operation. Retry before throwing exception. + * This function will not throw NoNodeException if the path does not + * exist. + */ + public void delete(String path, int version) throws InterruptedException, KeeperException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean isRetry = false; // False for first attempt, true for all retries. + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + checkZk().delete(path, version); + this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case NONODE: + if (isRetry) { + LOG.debug("Node " + path + " already deleted. Assuming a " + + "previous attempt succeeded."); + return; + } + LOG.debug("Node " + path + " already deleted, retry=" + isRetry); + throw e; + + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "delete"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "delete"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + isRetry = true; + } + } + } + + /** + * exists is an idempotent operation. Retry before throwing exception + * @return A Stat instance + */ + public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + Stat nodeStat = checkZk().exists(path, watcher); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return nodeStat; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "exists"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "exists"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + /** + * exists is an idempotent operation. Retry before throwing exception + * @return A Stat instance + */ + public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + Stat nodeStat = checkZk().exists(path, watch); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return nodeStat; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "exists"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "exists"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + private void retryOrThrow(RetryCounter retryCounter, KeeperException e, + String opName) throws KeeperException { + if (!retryCounter.shouldRetry()) { + LOG.error("ZooKeeper " + opName + " failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + LOG.debug("Retry, connectivity issue (JVM Pause?); quorum=" + quorumServers + "," + + "exception=" + e); + } + + /** + * getChildren is an idempotent operation. Retry before throwing exception + * @return List of children znodes + */ + public List<String> getChildren(String path, Watcher watcher) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + List<String> children = checkZk().getChildren(path, watcher); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return children; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "getChildren"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "getChildren"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + /** + * getChildren is an idempotent operation. Retry before throwing exception + * @return List of children znodes + */ + public List<String> getChildren(String path, boolean watch) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + List<String> children = checkZk().getChildren(path, watch); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return children; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "getChildren"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "getChildren"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + /** + * getData is an idempotent operation. Retry before throwing exception + * @return Data + */ + public byte[] getData(String path, Watcher watcher, Stat stat) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + byte[] revData = checkZk().getData(path, watcher, stat); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return ZKMetadata.removeMetaData(revData); + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "getData"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "getData"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + /** + * getData is an idempotent operation. Retry before throwing exception + * @return Data + */ + public byte[] getData(String path, boolean watch, Stat stat) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + byte[] revData = checkZk().getData(path, watch, stat); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return ZKMetadata.removeMetaData(revData); + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "getData"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "getData"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + /** + * setData is NOT an idempotent operation. Retry may cause BadVersion Exception + * Adding an identifier field into the data to check whether + * badversion is caused by the result of previous correctly setData + * @return Stat instance + */ + public Stat setData(String path, byte[] data, int version) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) { + RetryCounter retryCounter = retryCounterFactory.create(); + byte[] newData = ZKMetadata.appendMetaData(id, data); + boolean isRetry = false; + long startTime; + while (true) { + try { + startTime = EnvironmentEdgeManager.currentTime(); + Stat nodeStat = checkZk().setData(path, newData, version); + this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return nodeStat; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "setData"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "setData"); + break; + case BADVERSION: + if (isRetry) { + // try to verify whether the previous setData success or not + try{ + Stat stat = new Stat(); + startTime = EnvironmentEdgeManager.currentTime(); + byte[] revData = checkZk().getData(path, false, stat); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + if(Bytes.compareTo(revData, newData) == 0) { + // the bad version is caused by previous successful setData + return stat; + } + } catch(KeeperException keeperException){ + this.metrics.registerFailedZKCall(); + // the ZK is not reliable at this moment. just throwing exception + throw keeperException; + } + } + // throw other exceptions and verified bad version exceptions + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + isRetry = true; + } + } + } + + /** + * getAcl is an idempotent operation. Retry before throwing exception + * @return list of ACLs + */ + public List<ACL> getAcl(String path, Stat stat) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + List<ACL> nodeACL = checkZk().getACL(path, stat); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return nodeACL; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "getAcl"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "getAcl"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + /** + * setAcl is an idempotent operation. Retry before throwing exception + * @return list of ACLs + */ + public Stat setAcl(String path, List<ACL> acls, int version) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + Stat nodeStat = checkZk().setACL(path, acls, version); + this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return nodeStat; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "setAcl"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "setAcl"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + /** + * <p> + * NONSEQUENTIAL create is idempotent operation. + * Retry before throwing exceptions. + * But this function will not throw the NodeExist exception back to the + * application. + * </p> + * <p> + * But SEQUENTIAL is NOT idempotent operation. It is necessary to add + * identifier to the path to verify, whether the previous one is successful + * or not. + * </p> + * + * @return Path + */ + public String create(String path, byte[] data, List<ACL> acl, + CreateMode createMode) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) { + byte[] newData = ZKMetadata.appendMetaData(id, data); + switch (createMode) { + case EPHEMERAL: + case PERSISTENT: + return createNonSequential(path, newData, acl, createMode); + + case EPHEMERAL_SEQUENTIAL: + case PERSISTENT_SEQUENTIAL: + return createSequential(path, newData, acl, createMode); + + default: + throw new IllegalArgumentException("Unrecognized CreateMode: " + + createMode); + } + } + } + + private String createNonSequential(String path, byte[] data, List<ACL> acl, + CreateMode createMode) throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean isRetry = false; // False for first attempt, true for all retries. + long startTime; + while (true) { + try { + startTime = EnvironmentEdgeManager.currentTime(); + String nodePath = checkZk().create(path, data, acl, createMode); + this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return nodePath; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case NODEEXISTS: + if (isRetry) { + // If the connection was lost, there is still a possibility that + // we have successfully created the node at our previous attempt, + // so we read the node and compare. + startTime = EnvironmentEdgeManager.currentTime(); + byte[] currentData = checkZk().getData(path, false, null); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + if (currentData != null && + Bytes.compareTo(currentData, data) == 0) { + // We successfully created a non-sequential node + return path; + } + LOG.error("Node " + path + " already exists with " + + Bytes.toStringBinary(currentData) + ", could not write " + + Bytes.toStringBinary(data)); + throw e; + } + LOG.debug("Node " + path + " already exists"); + throw e; + + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "create"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "create"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + isRetry = true; + } + } + + private String createSequential(String path, byte[] data, + List<ACL> acl, CreateMode createMode) + throws KeeperException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + boolean first = true; + String newPath = path+this.identifier; + while (true) { + try { + if (!first) { + // Check if we succeeded on a previous attempt + String previousResult = findPreviousSequentialNode(newPath); + if (previousResult != null) { + return previousResult; + } + } + first = false; + long startTime = EnvironmentEdgeManager.currentTime(); + String nodePath = checkZk().create(newPath, data, acl, createMode); + this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return nodePath; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "create"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "create"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + /** + * Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op + * instances to actually pass to multi (need to do this in order to appendMetaData). + */ + private Iterable<Op> prepareZKMulti(Iterable<Op> ops) + throws UnsupportedOperationException { + if(ops == null) return null; + + List<Op> preparedOps = new LinkedList<>(); + for (Op op : ops) { + if (op.getType() == ZooDefs.OpCode.create) { + CreateRequest create = (CreateRequest)op.toRequestRecord(); + preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()), + create.getAcl(), create.getFlags())); + } else if (op.getType() == ZooDefs.OpCode.delete) { + // no need to appendMetaData for delete + preparedOps.add(op); + } else if (op.getType() == ZooDefs.OpCode.setData) { + SetDataRequest setData = (SetDataRequest)op.toRequestRecord(); + preparedOps.add(Op.setData(setData.getPath(), ZKMetadata.appendMetaData(id, setData.getData()), + setData.getVersion())); + } else { + throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName()); + } + } + return preparedOps; + } + + /** + * Run multiple operations in a transactional manner. Retry before throwing exception + */ + public List<OpResult> multi(Iterable<Op> ops) + throws KeeperException, InterruptedException { + try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) { + RetryCounter retryCounter = retryCounterFactory.create(); + Iterable<Op> multiOps = prepareZKMulti(ops); + while (true) { + try { + long startTime = EnvironmentEdgeManager.currentTime(); + List<OpResult> opResults = checkZk().multi(multiOps); + this.metrics.registerWriteOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + return opResults; + } catch (KeeperException e) { + this.metrics.registerFailedZKCall(); + switch (e.code()) { + case CONNECTIONLOSS: + this.metrics.registerConnectionLossException(); + retryOrThrow(retryCounter, e, "multi"); + break; + case OPERATIONTIMEOUT: + this.metrics.registerOperationTimeoutException(); + retryOrThrow(retryCounter, e, "multi"); + break; + + default: + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + } + + private String findPreviousSequentialNode(String path) + throws KeeperException, InterruptedException { + int lastSlashIdx = path.lastIndexOf('/'); + assert(lastSlashIdx != -1); + String parent = path.substring(0, lastSlashIdx); + String nodePrefix = path.substring(lastSlashIdx+1); + long startTime = EnvironmentEdgeManager.currentTime(); + List<String> nodes = checkZk().getChildren(parent, false); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + List<String> matching = filterByPrefix(nodes, nodePrefix); + for (String node : matching) { + String nodePath = parent + "/" + node; + startTime = EnvironmentEdgeManager.currentTime(); + Stat stat = checkZk().exists(nodePath, false); + this.metrics.registerReadOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + if (stat != null) { + return nodePath; + } + } + return null; + } + + public synchronized long getSessionId() { + return zk == null ? -1 : zk.getSessionId(); + } + + public synchronized void close() throws InterruptedException { + if (zk != null) zk.close(); + } + + public synchronized States getState() { + return zk == null ? null : zk.getState(); + } + + public synchronized ZooKeeper getZooKeeper() { + return zk; + } + + public synchronized byte[] getSessionPasswd() { + return zk == null ? null : zk.getSessionPasswd(); + } + + public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { + long startTime = EnvironmentEdgeManager.currentTime(); + checkZk().sync(path, cb, null); + this.metrics.registerSyncOperationLatency(Math.min(EnvironmentEdgeManager.currentTime() - startTime, 1)); + } + + /** + * Filters the given node list by the given prefixes. + * This method is all-inclusive--if any element in the node list starts + * with any of the given prefixes, then it is included in the result. + * + * @param nodes the nodes to filter + * @param prefixes the prefixes to include in the result + * @return list of every element that starts with one of the prefixes + */ + private static List<String> filterByPrefix(List<String> nodes, + String... prefixes) { + List<String> lockChildren = new ArrayList<>(); + for (String child : nodes){ + for (String prefix : prefixes){ + if (child.startsWith(prefix)){ + lockChildren.add(child); + break; + } + } + } + return lockChildren; + } + + public String getIdentifier() { + return identifier; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java new file mode 100644 index 0000000..93545ee --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionNormalizerTracker.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionNormalizerProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; + +/** + * Tracks region normalizer state up in ZK + */ +public class RegionNormalizerTracker extends ZKNodeTracker { + private static final Log LOG = LogFactory.getLog(RegionNormalizerTracker.class); + + public RegionNormalizerTracker(ZKWatcher watcher, + Abortable abortable) { + super(watcher, watcher.znodePaths.regionNormalizerZNode, abortable); + } + + /** + * Return true if region normalizer is on, false otherwise + */ + public boolean isNormalizerOn() { + byte [] upData = super.getData(false); + try { + // if data in ZK is null, use default of on. + return upData == null || parseFrom(upData).getNormalizerOn(); + } catch (DeserializationException dex) { + LOG.error("ZK state for RegionNormalizer could not be parsed " + + Bytes.toStringBinary(upData)); + // return false to be safe. + return false; + } + } + + /** + * Set region normalizer on/off + * @param normalizerOn whether normalizer should be on or off + * @throws KeeperException + */ + public void setNormalizerOn(boolean normalizerOn) throws KeeperException { + byte [] upData = toByteArray(normalizerOn); + try { + ZKUtil.setData(watcher, watcher.znodePaths.regionNormalizerZNode, upData); + } catch(KeeperException.NoNodeException nne) { + ZKUtil.createAndWatch(watcher, watcher.znodePaths.regionNormalizerZNode, upData); + } + super.nodeDataChanged(watcher.znodePaths.regionNormalizerZNode); + } + + private byte [] toByteArray(boolean isNormalizerOn) { + RegionNormalizerProtos.RegionNormalizerState.Builder builder = + RegionNormalizerProtos.RegionNormalizerState.newBuilder(); + builder.setNormalizerOn(isNormalizerOn); + return ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + } + + private RegionNormalizerProtos.RegionNormalizerState parseFrom(byte [] pbBytes) + throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(pbBytes); + RegionNormalizerProtos.RegionNormalizerState.Builder builder = + RegionNormalizerProtos.RegionNormalizerState.newBuilder(); + try { + int magicLen = ProtobufUtil.lengthOfPBMagic(); + ProtobufUtil.mergeFrom(builder, pbBytes, magicLen, pbBytes.length - magicLen); + } catch (IOException e) { + throw new DeserializationException(e); + } + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e0c4f374/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java new file mode 100644 index 0000000..4150f54 --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAclReset.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; + +/** + * You may add the jaas.conf option + * -Djava.security.auth.login.config=/PATH/jaas.conf + * + * You may also specify -D to set options + * "hbase.zookeeper.quorum" (it should be in hbase-site.xml) + * "zookeeper.znode.parent" (it should be in hbase-site.xml) + * + * Use -set-acls to set the ACLs, no option to erase ACLs + */ +@InterfaceAudience.Private +public class ZKAclReset extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(ZKAclReset.class); + + private static void resetAcls(final ZKWatcher zkw, final String znode, + final boolean eraseAcls) throws Exception { + List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode); + if (children != null) { + for (String child: children) { + resetAcls(zkw, ZNodePaths.joinZNode(znode, child), eraseAcls); + } + } + + ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper(); + if (eraseAcls) { + LOG.info(" - erase ACLs for " + znode); + zk.setACL(znode, ZooDefs.Ids.OPEN_ACL_UNSAFE, -1); + } else { + LOG.info(" - set ACLs for " + znode); + zk.setACL(znode, ZKUtil.createACL(zkw, znode, true), -1); + } + } + + private static void resetAcls(final Configuration conf, boolean eraseAcls) + throws Exception { + ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null); + try { + LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " + + zkw.getQuorum() + " " + zkw.znodePaths.baseZNode); + resetAcls(zkw, zkw.znodePaths.baseZNode, eraseAcls); + } finally { + zkw.close(); + } + } + + private void printUsageAndExit() { + System.err.printf("Usage: hbase %s [options]%n", getClass().getName()); + System.err.println(" where [options] are:"); + System.err.println(" -h|-help Show this help and exit."); + System.err.println(" -set-acls Setup the hbase znode ACLs for a secure cluster"); + System.err.println(); + System.err.println("Examples:"); + System.err.println(" To reset the ACLs to the unsecure cluster behavior:"); + System.err.println(" hbase " + getClass().getName()); + System.err.println(); + System.err.println(" To reset the ACLs to the secure cluster behavior:"); + System.err.println(" hbase " + getClass().getName() + " -set-acls"); + System.exit(1); + } + + @Override + public int run(String[] args) throws Exception { + boolean eraseAcls = true; + + for (int i = 0; i < args.length; ++i) { + if (args[i].equals("-help")) { + printUsageAndExit(); + } else if (args[i].equals("-set-acls")) { + eraseAcls = false; + } else { + printUsageAndExit(); + } + } + + resetAcls(getConf(), eraseAcls); + return(0); + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new ZKAclReset(), args)); + } +}