Repository: hbase Updated Branches: refs/heads/master 33f842855 -> ea0731d60
HBASE-11108 Split ZKTable into interface and implementation (Mikhail Antononv) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c5d5a5d1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c5d5a5d1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c5d5a5d1 Branch: refs/heads/master Commit: c5d5a5d1bc4ffab220fa9bf93aaa18fed7d6c8c8 Parents: 33f8428 Author: Michael Stack <[email protected]> Authored: Thu May 22 16:15:35 2014 -0700 Committer: Michael Stack <[email protected]> Committed: Thu May 22 16:15:35 2014 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/CoordinatedStateException.java | 46 +++ .../hadoop/hbase/CoordinatedStateManager.java | 66 ++++ .../apache/hadoop/hbase/TableStateManager.java | 115 +++++++ .../zookeeper/ZKTableStateClientSideReader.java | 168 ++++++++++ .../hbase/zookeeper/ZKTableStateManager.java | 330 +++++++++++++++++++ .../hbase/CoordinatedStateManagerFactory.java | 43 +++ .../consensus/BaseCoordinatedStateManager.java | 55 ++++ .../consensus/ZkCoordinatedStateManager.java | 59 ++++ .../zookeeper/TestZKTableStateManager.java | 114 +++++++ 9 files changed, 996 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java new file mode 100644 index 0000000..a28a0c1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateException.java @@ -0,0 +1,46 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.HBaseException; + +/** + * Thrown by operations requiring coordination state access or manipulation + * when internal error within coordination engine (or other internal implementation) occurs. + */ [email protected] +@SuppressWarnings("serial") +public class CoordinatedStateException extends HBaseException { + public CoordinatedStateException() { + super(); + } + + public CoordinatedStateException(final String message) { + super(message); + } + + public CoordinatedStateException(final String message, final Throwable t) { + super(message, t); + } + + public CoordinatedStateException(final Throwable t) { + super(t); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java new file mode 100644 index 0000000..2642e29 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Implementations of this interface will keep and return to clients + * implementations of classes providing API to execute + * coordinated operations. This interface is client-side, so it does NOT + * include methods to retrieve the particular interface implementations. + * + * For each coarse-grained area of operations there will be a separate + * interface with implementation, providing API for relevant operations + * requiring coordination. + * + * Property hbase.coordinated.state.manager.class in hbase-site.xml controls + * which provider to use. + */ [email protected] +public interface CoordinatedStateManager { + + /** + * Initialize coordinated state management service. + * @param server server instance to run within. + */ + void initialize(Server server); + + /** + * Starts service. + */ + void start(); + + /** + * Stops service. + */ + void stop(); + + /** + * @return instance of Server coordinated state manager runs within + */ + Server getServer(); + + /** + * Returns implementation of TableStateManager. + * @throws InterruptedException if operation is interrupted + * @throws CoordinatedStateException if error happens in underlying coordination mechanism + */ + TableStateManager getTableStateManager() throws InterruptedException, + CoordinatedStateException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java new file mode 100644 index 0000000..56cd4ae --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/TableStateManager.java @@ -0,0 +1,115 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; + +import java.io.InterruptedIOException; +import java.util.Set; + +/** + * Helper class for table state management for operations running inside + * RegionServer or HMaster. + * Depending on implementation, fetches information from HBase system table, + * local data store, ZooKeeper ensemble or somewhere else. + * Code running on client side (with no coordinated state context) shall instead use + * {@link org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader} + */ [email protected] +public interface TableStateManager { + + /** + * Sets the table into desired state. Fails silently if the table is already in this state. + * @param tableName table to process + * @param state new state of this table + * @throws CoordinatedStateException if error happened when trying to set table state + */ + void setTableState(TableName tableName, ZooKeeperProtos.Table.State state) + throws CoordinatedStateException; + + /** + * Sets the specified table into the newState, but only if the table is already in + * one of the possibleCurrentStates (otherwise no operation is performed). + * @param tableName table to process + * @param newState new state for the table + * @param states table should be in one of these states for the operation + * to be performed + * @throws CoordinatedStateException if error happened while performing operation + * @return true if operation succeeded, false otherwise + */ + boolean setTableStateIfInStates(TableName tableName, ZooKeeperProtos.Table.State newState, + ZooKeeperProtos.Table.State... states) + throws CoordinatedStateException; + + /** + * Sets the specified table into the newState, but only if the table is NOT in + * one of the possibleCurrentStates (otherwise no operation is performed). + * @param tableName table to process + * @param newState new state for the table + * @param states table should NOT be in one of these states for the operation + * to be performed + * @throws CoordinatedStateException if error happened while performing operation + * @return true if operation succeeded, false otherwise + */ + boolean setTableStateIfNotInStates(TableName tableName, ZooKeeperProtos.Table.State newState, + ZooKeeperProtos.Table.State... states) + throws CoordinatedStateException; + + /** + * @return true if the table is in any one of the listed states, false otherwise. + */ + boolean isTableState(TableName tableName, ZooKeeperProtos.Table.State... states); + + /** + * Mark table as deleted. Fails silently if the table is not currently marked as disabled. + * @param tableName table to be deleted + * @throws CoordinatedStateException if error happened while performing operation + */ + void setDeletedTable(TableName tableName) throws CoordinatedStateException; + + /** + * Checks if table is present. + * + * @param tableName table we're checking + * @return true if the table is present, false otherwise + */ + boolean isTablePresent(TableName tableName); + + /** + * @return set of tables which are in any one of the listed states, empty Set if none + */ + Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states) + throws InterruptedIOException, CoordinatedStateException; + + /** + * If the table is found in the given state the in-memory state is removed. This + * helps in cases where CreateTable is to be retried by the client in case of + * failures. If deletePermanentState is true - the flag kept permanently is + * also reset. + * + * @param tableName table we're working on + * @param states if table isn't in any one of these states, operation aborts + * @param deletePermanentState if true, reset the permanent flag + * @throws CoordinatedStateException if error happened in underlying coordination engine + */ + void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states, + boolean deletePermanentState) + throws CoordinatedStateException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java new file mode 100644 index 0000000..94bd31e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateClientSideReader.java @@ -0,0 +1,168 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import com.google.protobuf.InvalidProtocolBufferException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.zookeeper.KeeperException; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Non-instantiable class that provides helper functions to learn + * about HBase table state for code running on client side (hence, not having + * access to consensus context). + * + * Doesn't cache any table state, just goes directly to ZooKeeper. + * TODO: decouple this class from ZooKeeper. + */ [email protected] +public class ZKTableStateClientSideReader { + + private ZKTableStateClientSideReader() {} + + /** + * Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLED}. + * This method does not use cache. + * This method is for clients other than AssignmentManager + * @param zkw ZooKeeperWatcher instance to use + * @param tableName table we're checking + * @return True if table is enabled. + * @throws KeeperException + */ + public static boolean isDisabledTable(final ZooKeeperWatcher zkw, + final TableName tableName) + throws KeeperException, InterruptedException { + ZooKeeperProtos.Table.State state = getTableState(zkw, tableName); + return isTableState(ZooKeeperProtos.Table.State.DISABLED, state); + } + + /** + * Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#ENABLED}. + * This method does not use cache. + * This method is for clients other than AssignmentManager + * @param zkw ZooKeeperWatcher instance to use + * @param tableName table we're checking + * @return True if table is enabled. + * @throws KeeperException + */ + public static boolean isEnabledTable(final ZooKeeperWatcher zkw, + final TableName tableName) + throws KeeperException, InterruptedException { + return getTableState(zkw, tableName) == ZooKeeperProtos.Table.State.ENABLED; + } + + /** + * Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLING} + * of {@code ZooKeeperProtos.Table.State#DISABLED}. + * This method does not use cache. + * This method is for clients other than AssignmentManager. + * @param zkw ZooKeeperWatcher instance to use + * @param tableName table we're checking + * @return True if table is enabled. + * @throws KeeperException + */ + public static boolean isDisablingOrDisabledTable(final ZooKeeperWatcher zkw, + final TableName tableName) + throws KeeperException, InterruptedException { + ZooKeeperProtos.Table.State state = getTableState(zkw, tableName); + return isTableState(ZooKeeperProtos.Table.State.DISABLING, state) || + isTableState(ZooKeeperProtos.Table.State.DISABLED, state); + } + + /** + * Gets a list of all the tables set as disabled in zookeeper. + * @return Set of disabled tables, empty Set if none + * @throws KeeperException + */ + public static Set<TableName> getDisabledTables(ZooKeeperWatcher zkw) + throws KeeperException, InterruptedException { + Set<TableName> disabledTables = new HashSet<TableName>(); + List<String> children = + ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode); + for (String child: children) { + TableName tableName = + TableName.valueOf(child); + ZooKeeperProtos.Table.State state = getTableState(zkw, tableName); + if (state == ZooKeeperProtos.Table.State.DISABLED) disabledTables.add(tableName); + } + return disabledTables; + } + + /** + * Gets a list of all the tables set as disabled in zookeeper. + * @return Set of disabled tables, empty Set if none + * @throws KeeperException + */ + public static Set<TableName> getDisabledOrDisablingTables(ZooKeeperWatcher zkw) + throws KeeperException, InterruptedException { + Set<TableName> disabledTables = new HashSet<TableName>(); + List<String> children = + ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode); + for (String child: children) { + TableName tableName = + TableName.valueOf(child); + ZooKeeperProtos.Table.State state = getTableState(zkw, tableName); + if (state == ZooKeeperProtos.Table.State.DISABLED || + state == ZooKeeperProtos.Table.State.DISABLING) + disabledTables.add(tableName); + } + return disabledTables; + } + + static boolean isTableState(final ZooKeeperProtos.Table.State expectedState, + final ZooKeeperProtos.Table.State currentState) { + return currentState != null && currentState.equals(expectedState); + } + + /** + * @param zkw ZooKeeperWatcher instance to use + * @param tableName table we're checking + * @return Null or {@link ZooKeeperProtos.Table.State} found in znode. + * @throws KeeperException + */ + static ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw, + final TableName tableName) + throws KeeperException, InterruptedException { + String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString()); + byte [] data = ZKUtil.getData(zkw, znode); + if (data == null || data.length <= 0) return null; + try { + ProtobufUtil.expectPBMagicPrefix(data); + ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder(); + int magicLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build(); + return t.getState(); + } catch (InvalidProtocolBufferException e) { + KeeperException ke = new KeeperException.DataInconsistencyException(); + ke.initCause(e); + throw ke; + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java new file mode 100644 index 0000000..1aff12f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTableStateManager.java @@ -0,0 +1,330 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.zookeeper.KeeperException; + +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Implementation of TableStateManager which reads, caches and sets state + * up in ZooKeeper. If multiple read/write clients, will make for confusion. + * Code running on client side without consensus context should use + * {@link ZKTableStateClientSideReader} instead. + * + * <p>To save on trips to the zookeeper ensemble, internally we cache table + * state. + */ [email protected] +public class ZKTableStateManager implements TableStateManager { + // A znode will exist under the table directory if it is in any of the + // following states: {@link TableState#ENABLING} , {@link TableState#DISABLING}, + // or {@link TableState#DISABLED}. If {@link TableState#ENABLED}, there will + // be no entry for a table in zk. Thats how it currently works. + + private static final Log LOG = LogFactory.getLog(ZKTableStateManager.class); + private final ZooKeeperWatcher watcher; + + /** + * Cache of what we found in zookeeper so we don't have to go to zk ensemble + * for every query. Synchronize access rather than use concurrent Map because + * synchronization needs to span query of zk. + */ + private final Map<TableName, ZooKeeperProtos.Table.State> cache = + new HashMap<TableName, ZooKeeperProtos.Table.State>(); + + public ZKTableStateManager(final ZooKeeperWatcher zkw) throws KeeperException, + InterruptedException { + super(); + this.watcher = zkw; + populateTableStates(); + } + + /** + * Gets a list of all the tables set as disabled in zookeeper. + * @throws KeeperException, InterruptedException + */ + private void populateTableStates() throws KeeperException, InterruptedException { + synchronized (this.cache) { + List<String> children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode); + if (children == null) return; + for (String child: children) { + TableName tableName = TableName.valueOf(child); + ZooKeeperProtos.Table.State state = getTableState(this.watcher, tableName); + if (state != null) this.cache.put(tableName, state); + } + } + } + + /** + * Sets table state in ZK. Sets no watches. + * + * {@inheritDoc} + */ + @Override + public void setTableState(TableName tableName, ZooKeeperProtos.Table.State state) + throws CoordinatedStateException { + synchronized (this.cache) { + LOG.warn("Moving table " + tableName + " state from " + this.cache.get(tableName) + + " to " + state); + try { + setTableStateInZK(tableName, state); + } catch (KeeperException e) { + throw new CoordinatedStateException(e); + } + } + } + + /** + * Checks and sets table state in ZK. Sets no watches. + * {@inheritDoc} + */ + @Override + public boolean setTableStateIfInStates(TableName tableName, + ZooKeeperProtos.Table.State newState, + ZooKeeperProtos.Table.State... states) + throws CoordinatedStateException { + synchronized (this.cache) { + // Transition ENABLED->DISABLING has to be performed with a hack, because + // we treat empty state as enabled in this case because 0.92- clusters. + if ( + (newState == ZooKeeperProtos.Table.State.DISABLING) && + this.cache.get(tableName) != null && !isTableState(tableName, states) || + (newState != ZooKeeperProtos.Table.State.DISABLING && + !isTableState(tableName, states) )) { + return false; + } + try { + setTableStateInZK(tableName, newState); + } catch (KeeperException e) { + throw new CoordinatedStateException(e); + } + return true; + } + } + + /** + * Checks and sets table state in ZK. Sets no watches. + * {@inheritDoc} + */ + @Override + public boolean setTableStateIfNotInStates(TableName tableName, + ZooKeeperProtos.Table.State newState, + ZooKeeperProtos.Table.State... states) + throws CoordinatedStateException { + synchronized (this.cache) { + if (isTableState(tableName, states)) { + return false; + } + try { + setTableStateInZK(tableName, newState); + } catch (KeeperException e) { + throw new CoordinatedStateException(e); + } + return true; + } + } + + private void setTableStateInZK(final TableName tableName, + final ZooKeeperProtos.Table.State state) + throws KeeperException { + String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()); + if (ZKUtil.checkExists(this.watcher, znode) == -1) { + ZKUtil.createAndFailSilent(this.watcher, znode); + } + synchronized (this.cache) { + ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder(); + builder.setState(state); + byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + ZKUtil.setData(this.watcher, znode, data); + this.cache.put(tableName, state); + } + } + + /** + * Checks if table is marked in specified state in ZK. + * + * {@inheritDoc} + */ + @Override + public boolean isTableState(final TableName tableName, + final ZooKeeperProtos.Table.State... states) { + synchronized (this.cache) { + ZooKeeperProtos.Table.State currentState = this.cache.get(tableName); + return isTableInState(Arrays.asList(states), currentState); + } + } + + /** + * Deletes the table in zookeeper. Fails silently if the + * table is not currently disabled in zookeeper. Sets no watches. + * + * {@inheritDoc} + */ + @Override + public void setDeletedTable(final TableName tableName) + throws CoordinatedStateException { + synchronized (this.cache) { + if (this.cache.remove(tableName) == null) { + LOG.warn("Moving table " + tableName + " state to deleted but was " + + "already deleted"); + } + try { + ZKUtil.deleteNodeFailSilent(this.watcher, + ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); + } catch (KeeperException e) { + throw new CoordinatedStateException(e); + } + } + } + + /** + * check if table is present. + * + * @param tableName table we're working on + * @return true if the table is present + */ + @Override + public boolean isTablePresent(final TableName tableName) { + synchronized (this.cache) { + ZooKeeperProtos.Table.State state = this.cache.get(tableName); + return !(state == null); + } + } + + /** + * Gets a list of all the tables set as disabling in zookeeper. + * @return Set of disabling tables, empty Set if none + * @throws CoordinatedStateException if error happened in underlying coordination engine + */ + @Override + public Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states) + throws InterruptedIOException, CoordinatedStateException { + try { + return getAllTables(states); + } catch (KeeperException e) { + throw new CoordinatedStateException(e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states, + boolean deletePermanentState) + throws CoordinatedStateException { + synchronized (this.cache) { + if (isTableState(tableName, states)) { + this.cache.remove(tableName); + if (deletePermanentState) { + try { + ZKUtil.deleteNodeFailSilent(this.watcher, + ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString())); + } catch (KeeperException e) { + throw new CoordinatedStateException(e); + } + } + } + } + } + + /** + * Gets a list of all the tables of specified states in zookeeper. + * @return Set of tables of specified states, empty Set if none + * @throws KeeperException + */ + Set<TableName> getAllTables(final ZooKeeperProtos.Table.State... states) + throws KeeperException, InterruptedIOException { + + Set<TableName> allTables = new HashSet<TableName>(); + List<String> children = + ZKUtil.listChildrenNoWatch(watcher, watcher.tableZNode); + if(children == null) return allTables; + for (String child: children) { + TableName tableName = TableName.valueOf(child); + ZooKeeperProtos.Table.State state; + try { + state = getTableState(watcher, tableName); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + for (ZooKeeperProtos.Table.State expectedState: states) { + if (state == expectedState) { + allTables.add(tableName); + break; + } + } + } + return allTables; + } + + /** + * Gets table state from ZK. + * @param zkw ZooKeeperWatcher instance to use + * @param tableName table we're checking + * @return Null or {@link ZooKeeperProtos.Table.State} found in znode. + * @throws KeeperException + */ + private ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw, + final TableName tableName) + throws KeeperException, InterruptedException { + String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString()); + byte [] data = ZKUtil.getData(zkw, znode); + if (data == null || data.length <= 0) return null; + try { + ProtobufUtil.expectPBMagicPrefix(data); + ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder(); + int magicLen = ProtobufUtil.lengthOfPBMagic(); + ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build(); + return t.getState(); + } catch (InvalidProtocolBufferException e) { + KeeperException ke = new KeeperException.DataInconsistencyException(); + ke.initCause(e); + throw ke; + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } + } + + /** + * @return true if current state isn't null and is contained + * in the list of expected states. + */ + private boolean isTableInState(final List<ZooKeeperProtos.Table.State> expectedStates, + final ZooKeeperProtos.Table.State currentState) { + return currentState != null && expectedStates.contains(currentState); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java new file mode 100644 index 0000000..77ef217 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManagerFactory.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * Creates instance of {@link CoordinatedStateManager} + * based on configuration. + */ [email protected] +public class CoordinatedStateManagerFactory { + + /** + * Creates consensus provider from the given configuration. + * @param conf Configuration + * @return Implementation of {@link CoordinatedStateManager} + */ + public static CoordinatedStateManager getCoordinatedStateManager(Configuration conf) { + Class<? extends CoordinatedStateManager> coordinatedStateMgrKlass = + conf.getClass(HConstants.HBASE_COORDINATED_STATE_MANAGER_CLASS, + ZkCoordinatedStateManager.class, CoordinatedStateManager.class); + return ReflectionUtils.newInstance(coordinatedStateMgrKlass, conf); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java new file mode 100644 index 0000000..7f4e510 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/BaseCoordinatedStateManager.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.consensus; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableStateManager; + +/** + * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations. + * Defines methods to retrieve consensus objects for relevant areas. CoordinatedStateManager + * reference returned from Server interface has to be casted to this type to + * access those methods. + */ [email protected] +public abstract class BaseCoordinatedStateManager implements CoordinatedStateManager { + + @Override + public void initialize(Server server) { + } + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public Server getServer() { + return null; + } + + @Override + public abstract TableStateManager getTableStateManager() throws InterruptedException, + CoordinatedStateException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java new file mode 100644 index 0000000..27e09ca --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/consensus/ZkCoordinatedStateManager.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.consensus; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}. + */ [email protected] +public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { + private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class); + private Server server; + private ZooKeeperWatcher watcher; + + @Override + public void initialize(Server server) { + this.server = server; + this.watcher = server.getZooKeeper(); + } + + @Override + public Server getServer() { + return server; + } + + @Override + public TableStateManager getTableStateManager() throws InterruptedException, + CoordinatedStateException { + try { + return new ZKTableStateManager(server.getZooKeeper()); + } catch (KeeperException e) { + throw new CoordinatedStateException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c5d5a5d1/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java new file mode 100644 index 0000000..f5210cc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKTableStateManager.java @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableStateManager; +import org.apache.zookeeper.KeeperException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table; + +@Category(MediumTests.class) +public class TestZKTableStateManager { + private static final Log LOG = LogFactory.getLog(TestZKTableStateManager.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testTableStates() + throws CoordinatedStateException, IOException, KeeperException, InterruptedException { + final TableName name = + TableName.valueOf("testDisabled"); + Abortable abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + + @Override + public boolean isAborted() { + return false; + } + + }; + ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + name.getNameAsString(), abortable, true); + TableStateManager zkt = new ZKTableStateManager(zkw); + assertFalse(zkt.isTableState(name, Table.State.ENABLED)); + assertFalse(zkt.isTableState(name, Table.State.DISABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLED)); + assertFalse(zkt.isTableState(name, Table.State.ENABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING)); + assertFalse(zkt.isTablePresent(name)); + zkt.setTableState(name, Table.State.DISABLING); + assertTrue(zkt.isTableState(name, Table.State.DISABLING)); + assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING)); + assertFalse(zkt.getTablesInStates(Table.State.DISABLED).contains(name)); + assertTrue(zkt.isTablePresent(name)); + zkt.setTableState(name, Table.State.DISABLED); + assertTrue(zkt.isTableState(name, Table.State.DISABLED)); + assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLING)); + assertTrue(zkt.getTablesInStates(Table.State.DISABLED).contains(name)); + assertTrue(zkt.isTablePresent(name)); + zkt.setTableState(name, Table.State.ENABLING); + assertTrue(zkt.isTableState(name, Table.State.ENABLING)); + assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLED)); + assertFalse(zkt.getTablesInStates(Table.State.DISABLED).contains(name)); + assertTrue(zkt.isTablePresent(name)); + zkt.setTableState(name, Table.State.ENABLED); + assertTrue(zkt.isTableState(name, Table.State.ENABLED)); + assertFalse(zkt.isTableState(name, Table.State.ENABLING)); + assertTrue(zkt.isTablePresent(name)); + zkt.setDeletedTable(name); + assertFalse(zkt.isTableState(name, Table.State.ENABLED)); + assertFalse(zkt.isTableState(name, Table.State.DISABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLED)); + assertFalse(zkt.isTableState(name, Table.State.ENABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING)); + assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING)); + assertFalse(zkt.isTablePresent(name)); + } +}
