http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml new file mode 100644 index 0000000..858e9fc --- /dev/null +++ b/hbase-replication/pom.xml @@ -0,0 +1,264 @@ +<?xml version="1.0"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>hbase-build-configuration</artifactId> + <groupId>org.apache.hbase</groupId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <artifactId>hbase-replication</artifactId> + <name>Apache HBase - Replication</name> + <description>HBase Replication Support</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <!--Make it so assembly:single does nothing in here--> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <skipAssembly>true</skipAssembly> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <!-- Always skip the second part executions, since we only run + simple unit tests in this module --> + <executions> + <execution> + <id>secondPartTestsExecution</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + <!-- Make a jar and put the sources in the jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + </plugins> + <pluginManagement> + <plugins> + <!--This plugin's configuration is used to store Eclipse m2e settings + only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <versionRange>[3.2,)</versionRange> + <goals> + <goal>compile</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + + <dependencies> + <!-- Intra-project dependencies --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-annotations</artifactId> + <exclusions> + <exclusion> + <groupId>jdk.tools</groupId> + <artifactId>jdk.tools</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-annotations</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <type>test-jar</type> + </dependency> + <!-- General dependencies --> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <!-- profile against Hadoop 2.x: This is the default. --> + <profile> + <id>hadoop-2.0</id> + <activation> + <property> + <!--Below formatting for dev-support/generate-hadoopX-poms.sh--> + <!--h2--><name>!hadoop.profile</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>com.github.stephenc.findbugs</groupId> + <artifactId>findbugs-annotations</artifactId> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <exclusions> + <exclusion> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet.jsp</groupId> + <artifactId>jsp-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + + <!-- + profile for building against Hadoop 3.0.x. Activate using: + mvn -Dhadoop.profile=3.0 + --> + <profile> + <id>hadoop-3.0</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>3.0</value> + </property> + </activation> + <properties> + <hadoop.version>3.0-SNAPSHOT</hadoop.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> +</project>
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java new file mode 100644 index 0000000..8506cbb --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.commons.lang.reflect.ConstructorUtils; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * A factory class for instantiating replication objects that deal with replication state. + */ +@InterfaceAudience.Private +public class ReplicationFactory { + + public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class; + + public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args) + throws Exception { + Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." + + "replication.replicationQueues.class", defaultReplicationQueueClass); + return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args); + } + + public static ReplicationQueuesClient getReplicationQueuesClient( + ReplicationQueuesClientArguments args) throws Exception { + Class<?> classToBuild = args.getConf().getClass( + "hbase.region.replica.replication.replicationQueuesClient.class", + ReplicationQueuesClientZKImpl.class); + return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args); + } + + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) { + return getReplicationPeers(zk, conf, null, abortable); + } + + public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf, + final ReplicationQueuesClient queuesClient, Abortable abortable) { + return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); + } + + public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper, + final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, + Stoppable stopper) { + return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java new file mode 100644 index 0000000..dfb5fdc --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The replication listener interface can be implemented if a class needs to subscribe to events + * generated by the ReplicationTracker. These events include things like addition/deletion of peer + * clusters or failure of a local region server. To receive events, the class also needs to register + * itself with a Replication Tracker. + */ +@InterfaceAudience.Private +public interface ReplicationListener { + + /** + * A region server has been removed from the local cluster + * @param regionServer the removed region server + */ + public void regionServerRemoved(String regionServer); + + /** + * A peer cluster has been removed (i.e. unregistered) from replication. + * @param peerId The peer id of the cluster that has been removed + */ + public void peerRemoved(String peerId); + + /** + * The list of registered peer clusters has changed. + * @param peerIds A list of all currently registered peer clusters + */ + public void peerListChanged(List<String> peerIds); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java new file mode 100644 index 0000000..4f18048 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + + +/** + * ReplicationPeer manages enabled / disabled state for the peer. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface ReplicationPeer { + + /** + * State of the peer, whether it is enabled or not + */ + @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) + enum PeerState { + ENABLED, + DISABLED + } + + /** + * Get the identifier of this peer + * @return string representation of the id + */ + String getId(); + + /** + * Get the peer config object + * @return the ReplicationPeerConfig for this peer + */ + public ReplicationPeerConfig getPeerConfig(); + + /** + * Returns the state of the peer + * @return the enabled state + */ + PeerState getPeerState(); + + /** + * Get the configuration object required to communicate with this peer + * @return configuration object + */ + public Configuration getConfiguration(); + + /** + * Get replicable (table, cf-list) map of this peer + * @return the replicable (table, cf-list) map + */ + public Map<TableName, List<String>> getTableCFs(); + + /** + * Get replicable namespace set of this peer + * @return the replicable namespaces set + */ + public Set<String> getNamespaces(); + + /** + * Get the per node bandwidth upper limit for this peer + * @return the bandwidth upper limit + */ + public long getPeerBandwidth(); + + void trackPeerConfigChanges(ReplicationPeerConfigListener listener); + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java new file mode 100644 index 0000000..4e04186 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) +public interface ReplicationPeerConfigListener { + /** Callback method for when users update the ReplicationPeerConfig for this peer + * + * @param rpc The updated ReplicationPeerConfig + */ + void peerConfigUpdated(ReplicationPeerConfig rpc); + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java new file mode 100644 index 0000000..3973be9 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -0,0 +1,318 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; + +@InterfaceAudience.Private +public class ReplicationPeerZKImpl extends ReplicationStateZKBase + implements ReplicationPeer, Abortable, Closeable { + private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class); + + private ReplicationPeerConfig peerConfig; + private final String id; + private volatile PeerState peerState; + private volatile Map<TableName, List<String>> tableCFs = new HashMap<>(); + private final Configuration conf; + private PeerStateTracker peerStateTracker; + private PeerConfigTracker peerConfigTracker; + + + /** + * Constructor that takes all the objects required to communicate with the specified peer, except + * for the region server addresses. + * @param conf configuration object to this peer + * @param id string representation of this peer's identifier + * @param peerConfig configuration for the replication peer + */ + public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf, + String id, ReplicationPeerConfig peerConfig, + Abortable abortable) + throws ReplicationException { + super(zkWatcher, conf, abortable); + this.conf = conf; + this.peerConfig = peerConfig; + this.id = id; + } + + /** + * start a state tracker to check whether this peer is enabled or not + * + * @param peerStateNode path to zk node which stores peer state + * @throws KeeperException + */ + public void startStateTracker(String peerStateNode) + throws KeeperException { + ensurePeerEnabled(peerStateNode); + this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); + this.peerStateTracker.start(); + try { + this.readPeerStateZnode(); + } catch (DeserializationException e) { + throw ZKUtil.convert(e); + } + } + + private void readPeerStateZnode() throws DeserializationException { + this.peerState = + isStateEnabled(this.peerStateTracker.getData(false)) + ? PeerState.ENABLED + : PeerState.DISABLED; + } + + /** + * start a table-cfs tracker to listen the (table, cf-list) map change + * @param peerConfigNode path to zk node which stores table-cfs + * @throws KeeperException + */ + public void startPeerConfigTracker(String peerConfigNode) + throws KeeperException { + this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper, + this); + this.peerConfigTracker.start(); + this.readPeerConfig(); + } + + private ReplicationPeerConfig readPeerConfig() { + try { + byte[] data = peerConfigTracker.getData(false); + if (data != null) { + this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data); + } + } catch (DeserializationException e) { + LOG.error("", e); + } + return this.peerConfig; + } + + @Override + public PeerState getPeerState() { + return peerState; + } + + /** + * Get the identifier of this peer + * @return string representation of the id (short) + */ + @Override + public String getId() { + return id; + } + + /** + * Get the peer config object + * @return the ReplicationPeerConfig for this peer + */ + @Override + public ReplicationPeerConfig getPeerConfig() { + return peerConfig; + } + + /** + * Get the configuration object required to communicate with this peer + * @return configuration object + */ + @Override + public Configuration getConfiguration() { + return conf; + } + + /** + * Get replicable (table, cf-list) map of this peer + * @return the replicable (table, cf-list) map + */ + @Override + public Map<TableName, List<String>> getTableCFs() { + this.tableCFs = peerConfig.getTableCFsMap(); + return this.tableCFs; + } + + /** + * Get replicable namespace set of this peer + * @return the replicable namespaces set + */ + @Override + public Set<String> getNamespaces() { + return this.peerConfig.getNamespaces(); + } + + @Override + public long getPeerBandwidth() { + return this.peerConfig.getBandwidth(); + } + + @Override + public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { + if (this.peerConfigTracker != null){ + this.peerConfigTracker.setListener(listener); + } + } + + @Override + public void abort(String why, Throwable e) { + LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig + + " was aborted for the following reason(s):" + why, e); + } + + @Override + public boolean isAborted() { + // Currently the replication peer is never "Aborted", we just log when the + // abort method is called. + return false; + } + + @Override + public void close() throws IOException { + // TODO: stop zkw? + } + + /** + * Parse the raw data from ZK to get a peer's state + * @param bytes raw ZK data + * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state. + * @throws DeserializationException + */ + public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { + ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes); + return ReplicationProtos.ReplicationState.State.ENABLED == state; + } + + /** + * @param bytes Content of a state znode. + * @return State parsed from the passed bytes. + * @throws DeserializationException + */ + private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) + throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(bytes); + int pblen = ProtobufUtil.lengthOfPBMagic(); + ReplicationProtos.ReplicationState.Builder builder = + ReplicationProtos.ReplicationState.newBuilder(); + ReplicationProtos.ReplicationState state; + try { + ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); + state = builder.build(); + return state.getState(); + } catch (IOException e) { + throw new DeserializationException(e); + } + } + + /** + * Utility method to ensure an ENABLED znode is in place; if not present, we create it. + * @param path Path to znode to check + * @return True if we created the znode. + * @throws NodeExistsException + * @throws KeeperException + */ + private boolean ensurePeerEnabled(final String path) + throws NodeExistsException, KeeperException { + if (ZKUtil.checkExists(zookeeper, path) == -1) { + // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the + // peer-state znode. This happens while adding a peer. + // The peer state data is set as "ENABLED" by default. + ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, + ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + return true; + } + return false; + } + + /** + * Tracker for state of this peer + */ + public class PeerStateTracker extends ZooKeeperNodeTracker { + + public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, peerStateZNode, abortable); + } + + @Override + public synchronized void nodeDataChanged(String path) { + if (path.equals(node)) { + super.nodeDataChanged(path); + try { + readPeerStateZnode(); + } catch (DeserializationException e) { + LOG.warn("Failed deserializing the content of " + path, e); + } + } + } + } + + /** + * Tracker for PeerConfigNode of this peer + */ + public class PeerConfigTracker extends ZooKeeperNodeTracker { + + ReplicationPeerConfigListener listener; + + public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, + Abortable abortable) { + super(watcher, peerConfigNode, abortable); + } + + public synchronized void setListener(ReplicationPeerConfigListener listener){ + this.listener = listener; + } + + @Override + public synchronized void nodeCreated(String path) { + if (path.equals(node)) { + super.nodeCreated(path); + ReplicationPeerConfig config = readPeerConfig(); + if (listener != null){ + listener.peerConfigUpdated(config); + } + } + } + + @Override + public synchronized void nodeDataChanged(String path) { + //superclass calls nodeCreated + if (path.equals(node)) { + super.nodeDataChanged(path); + } + + } + + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java new file mode 100644 index 0000000..2a7963a --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -0,0 +1,177 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; + +/** + * This provides an interface for maintaining a set of peer clusters. These peers are remote slave + * clusters that data is replicated to. A peer cluster can be in three different states: + * + * 1. Not-Registered - There is no notion of the peer cluster. + * 2. Registered - The peer has an id and is being tracked but there is no connection. + * 3. Connected - There is an active connection to the remote peer. + * + * In the registered or connected state, a peer cluster can either be enabled or disabled. + */ +@InterfaceAudience.Private +public interface ReplicationPeers { + + /** + * Initialize the ReplicationPeers interface. + */ + void init() throws ReplicationException; + + /** + * Add a new remote slave cluster for replication. + * @param peerId a short that identifies the cluster + * @param peerConfig configuration for the replication slave cluster + */ + void registerPeer(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException; + + /** + * Removes a remote slave cluster and stops the replication to it. + * @param peerId a short that identifies the cluster + */ + void unregisterPeer(String peerId) throws ReplicationException; + + /** + * Method called after a peer has been connected. It will create a ReplicationPeer to track the + * newly connected cluster. + * @param peerId a short that identifies the cluster + * @return whether a ReplicationPeer was successfully created + * @throws ReplicationException + */ + boolean peerConnected(String peerId) throws ReplicationException; + + /** + * Method called after a peer has been disconnected. It will remove the ReplicationPeer that + * tracked the disconnected cluster. + * @param peerId a short that identifies the cluster + */ + void peerDisconnected(String peerId); + + /** + * Restart the replication to the specified remote slave cluster. + * @param peerId a short that identifies the cluster + */ + void enablePeer(String peerId) throws ReplicationException; + + /** + * Stop the replication to the specified remote slave cluster. + * @param peerId a short that identifies the cluster + */ + void disablePeer(String peerId) throws ReplicationException; + + /** + * Get the table and column-family list string of the peer from the underlying storage. + * @param peerId a short that identifies the cluster + */ + public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId) + throws ReplicationException; + + /** + * Set the table and column-family list string of the peer to the underlying storage. + * @param peerId a short that identifies the cluster + * @param tableCFs the table and column-family list which will be replicated for this peer + */ + public void setPeerTableCFsConfig(String peerId, + Map<TableName, ? extends Collection<String>> tableCFs) + throws ReplicationException; + + /** + * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will + * continue to track changes to the Peer's state and config. This method returns null if no + * peer has been connected with the given peerId. + * @param peerId id for the peer + * @return ReplicationPeer object + */ + ReplicationPeer getConnectedPeer(String peerId); + + /** + * Returns the set of peerIds of the clusters that have been connected and have an underlying + * ReplicationPeer. + * @return a Set of Strings for peerIds + */ + public Set<String> getConnectedPeerIds(); + + /** + * Get the replication status for the specified connected remote slave cluster. + * The value might be read from cache, so it is recommended to + * use {@link #getStatusOfPeerFromBackingStore(String)} + * if reading the state after enabling or disabling it. + * @param peerId a short that identifies the cluster + * @return true if replication is enabled, false otherwise. + */ + boolean getStatusOfPeer(String peerId); + + /** + * Get the replication status for the specified remote slave cluster, which doesn't + * have to be connected. The state is read directly from the backing store. + * @param peerId a short that identifies the cluster + * @return true if replication is enabled, false otherwise. + * @throws ReplicationException thrown if there's an error contacting the store + */ + boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException; + + /** + * List the cluster replication configs of all remote slave clusters (whether they are + * enabled/disabled or connected/disconnected). + * @return A map of peer ids to peer cluster keys + */ + Map<String, ReplicationPeerConfig> getAllPeerConfigs(); + + /** + * List the peer ids of all remote slave clusters (whether they are enabled/disabled or + * connected/disconnected). + * @return A list of peer ids + */ + List<String> getAllPeerIds(); + + /** + * Returns the configured ReplicationPeerConfig for this peerId + * @param peerId a short name that identifies the cluster + * @return ReplicationPeerConfig for the peer + */ + ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException; + + /** + * Returns the configuration needed to talk to the remote slave cluster. + * @param peerId a short that identifies the cluster + * @return the configuration for the peer cluster, null if it was unable to get the configuration + */ + Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException; + + /** + * Update the peerConfig for the a given peer cluster + * @param id a short that identifies the cluster + * @param peerConfig new config for the peer cluster + * @throws ReplicationException + */ + void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java new file mode 100644 index 0000000..751e454 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -0,0 +1,546 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The + * peers znode contains a list of all peer replication clusters and the current replication state of + * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with + * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the + * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of + * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase. + * For example: + * + * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase] + * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase] + * + * Each of these peer znodes has a child znode that indicates whether or not replication is enabled + * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a + * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the + * ReplicationPeer.PeerStateTracker class. For example: + * + * /hbase/replication/peers/1/peer-state [Value: ENABLED] + * + * Each of these peer znodes has a child znode that indicates which data will be replicated + * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a + * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker + * class. For example: + * + * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"] + */ +@InterfaceAudience.Private +public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers { + + // Map of peer clusters keyed by their id + private Map<String, ReplicationPeerZKImpl> peerClusters; + private final ReplicationQueuesClient queuesClient; + private Abortable abortable; + + private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class); + + public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf, + final ReplicationQueuesClient queuesClient, Abortable abortable) { + super(zk, conf, abortable); + this.abortable = abortable; + this.peerClusters = new ConcurrentHashMap<>(); + this.queuesClient = queuesClient; + } + + @Override + public void init() throws ReplicationException { + try { + if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Could not initialize replication peers", e); + } + addExistingPeers(); + } + + @Override + public void registerPeer(String id, ReplicationPeerConfig peerConfig) + throws ReplicationException { + try { + if (peerExists(id)) { + throw new IllegalArgumentException("Cannot add a peer with id=" + id + + " because that id already exists."); + } + + if(id.contains("-")){ + throw new IllegalArgumentException("Found invalid peer name:" + id); + } + + if (peerConfig.getClusterKey() != null) { + try { + ZKConfig.validateClusterKey(peerConfig.getClusterKey()); + } catch (IOException ioe) { + throw new IllegalArgumentException(ioe.getMessage()); + } + } + + checkQueuesDeleted(id); + + ZKUtil.createWithParents(this.zookeeper, this.peersZNode); + + List<ZKUtilOp> listOfOps = new ArrayList<>(2); + ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), + ReplicationSerDeHelper.toByteArray(peerConfig)); + // b/w PeerWatcher and ReplicationZookeeper#add method to create the + // peer-state znode. This happens while adding a peer + // The peer state data is set as "ENABLED" by default. + ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES); + listOfOps.add(op1); + listOfOps.add(op2); + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); + // A peer is enabled by default + } catch (KeeperException e) { + throw new ReplicationException("Could not add peer with id=" + id + + ", peerConfif=>" + peerConfig, e); + } + } + + @Override + public void unregisterPeer(String id) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("Cannot remove peer with id=" + id + + " because that id does not exist."); + } + ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)); + } catch (KeeperException e) { + throw new ReplicationException("Could not remove peer with id=" + id, e); + } + } + + @Override + public void enablePeer(String id) throws ReplicationException { + changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED); + LOG.info("peer " + id + " is enabled"); + } + + @Override + public void disablePeer(String id) throws ReplicationException { + changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED); + LOG.info("peer " + id + " is disabled"); + } + + @Override + public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + try { + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); + } + return rpc.getTableCFsMap(); + } catch (Exception e) { + throw new ReplicationException(e); + } + } catch (KeeperException e) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e); + } + } + + @Override + public void setPeerTableCFsConfig(String id, + Map<TableName, ? extends Collection<String>> tableCFs) + throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id + + " does not exist."); + } + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); + } + rpc.setTableCFsMap(tableCFs); + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(rpc)); + LOG.info("Peer tableCFs with id= " + id + " is now " + + ReplicationSerDeHelper.convertToString(tableCFs)); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); + } + } + + @Override + public boolean getStatusOfPeer(String id) { + ReplicationPeer replicationPeer = this.peerClusters.get(id); + if (replicationPeer == null) { + throw new IllegalArgumentException("Peer with id= " + id + " is not cached"); + } + return replicationPeer.getPeerState() == PeerState.ENABLED; + } + + @Override + public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("peer " + id + " doesn't exist"); + } + String peerStateZNode = getPeerStateNode(id); + try { + return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); + } catch (KeeperException e) { + throw new ReplicationException(e); + } catch (DeserializationException e) { + throw new ReplicationException(e); + } + } catch (KeeperException e) { + throw new ReplicationException("Unable to get status of the peer with id=" + id + + " from backing store", e); + } catch (InterruptedException e) { + throw new ReplicationException(e); + } + } + + @Override + public Map<String, ReplicationPeerConfig> getAllPeerConfigs() { + Map<String, ReplicationPeerConfig> peers = new TreeMap<>(); + List<String> ids = null; + try { + ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + for (String id : ids) { + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); + if (peerConfig == null) { + LOG.warn("Failed to get replication peer configuration of clusterid=" + id + + " znode content, continuing."); + continue; + } + peers.put(id, peerConfig); + } + } catch (KeeperException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } catch (ReplicationException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } + return peers; + } + + @Override + public ReplicationPeer getConnectedPeer(String peerId) { + return peerClusters.get(peerId); + } + + @Override + public Set<String> getConnectedPeerIds() { + return peerClusters.keySet(); // this is not thread-safe + } + + /** + * Returns a ReplicationPeerConfig from the znode or null for the given peerId. + */ + @Override + public ReplicationPeerConfig getReplicationPeerConfig(String peerId) + throws ReplicationException { + String znode = getPeerNode(peerId); + byte[] data = null; + try { + data = ZKUtil.getData(this.zookeeper, znode); + } catch (InterruptedException e) { + LOG.warn("Could not get configuration for peer because the thread " + + "was interrupted. peerId=" + peerId); + Thread.currentThread().interrupt(); + return null; + } catch (KeeperException e) { + throw new ReplicationException("Error getting configuration for peer with id=" + + peerId, e); + } + if (data == null) { + LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId); + return null; + } + + try { + return ReplicationSerDeHelper.parsePeerFrom(data); + } catch (DeserializationException e) { + LOG.warn("Failed to parse cluster key from peerId=" + peerId + + ", specifically the content from the following znode: " + znode); + return null; + } + } + + @Override + public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) + throws ReplicationException { + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId); + + if (peerConfig == null) { + return null; + } + + Configuration otherConf; + try { + otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); + } catch (IOException e) { + LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e); + return null; + } + + if (!peerConfig.getConfiguration().isEmpty()) { + CompoundConfiguration compound = new CompoundConfiguration(); + compound.add(otherConf); + compound.addStringMap(peerConfig.getConfiguration()); + return new Pair<>(peerConfig, compound); + } + + return new Pair<>(peerConfig, otherConf); + } + + @Override + public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) + throws ReplicationException { + ReplicationPeer peer = getConnectedPeer(id); + if (peer == null){ + throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); + } + ReplicationPeerConfig existingConfig = peer.getPeerConfig(); + if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && + !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ + throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." + + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" + + newConfig.getClusterKey() + + "'"); + } + String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); + if (newConfig.getReplicationEndpointImpl() != null && + !newConfig.getReplicationEndpointImpl().isEmpty() && + !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){ + throw new ReplicationException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + existingConfig.getReplicationEndpointImpl() + + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); + } + //Update existingConfig's peer config and peer data with the new values, but don't touch config + // or data that weren't explicitly changed + existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); + existingConfig.getPeerData().putAll(newConfig.getPeerData()); + existingConfig.setTableCFsMap(newConfig.getTableCFsMap()); + existingConfig.setNamespaces(newConfig.getNamespaces()); + existingConfig.setBandwidth(newConfig.getBandwidth()); + + try { + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(existingConfig)); + } + catch(KeeperException ke){ + throw new ReplicationException("There was a problem trying to save changes to the " + + "replication peer " + id, ke); + } + } + + /** + * List all registered peer clusters and set a watch on their znodes. + */ + @Override + public List<String> getAllPeerIds() { + List<String> ids = null; + try { + ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode); + } catch (KeeperException e) { + this.abortable.abort("Cannot get the list of peers ", e); + } + return ids; + } + + /** + * A private method used during initialization. This method attempts to add all registered + * peer clusters. This method does not set a watch on the peer cluster znodes. + */ + private void addExistingPeers() throws ReplicationException { + List<String> znodes = null; + try { + znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); + } catch (KeeperException e) { + throw new ReplicationException("Error getting the list of peer clusters.", e); + } + if (znodes != null) { + for (String z : znodes) { + createAndAddPeer(z); + } + } + } + + @Override + public boolean peerConnected(String peerId) throws ReplicationException { + return createAndAddPeer(peerId); + } + + @Override + public void peerDisconnected(String peerId) { + ReplicationPeer rp = this.peerClusters.get(peerId); + if (rp != null) { + ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp); + } + } + + /** + * Attempt to connect to a new remote slave cluster. + * @param peerId a short that identifies the cluster + * @return true if a new connection was made, false if no new connection was made. + */ + public boolean createAndAddPeer(String peerId) throws ReplicationException { + if (peerClusters == null) { + return false; + } + if (this.peerClusters.containsKey(peerId)) { + return false; + } + + ReplicationPeerZKImpl peer = null; + try { + peer = createPeer(peerId); + } catch (Exception e) { + throw new ReplicationException("Error adding peer with id=" + peerId, e); + } + if (peer == null) { + return false; + } + ReplicationPeerZKImpl previous = + ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer); + if (previous == null) { + LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey()); + } else { + LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() + + ", new cluster=" + peer.getPeerConfig().getClusterKey()); + } + return true; + } + + /** + * Update the state znode of a peer cluster. + * @param id + * @param state + */ + private void changePeerState(String id, ReplicationProtos.ReplicationState.State state) + throws ReplicationException { + try { + if (!peerExists(id)) { + throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id + + " does not exist."); + } + String peerStateZNode = getPeerStateNode(id); + byte[] stateBytes = + (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES + : DISABLED_ZNODE_BYTES; + if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) { + ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes); + } else { + ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes); + } + LOG.info("Peer with id= " + id + " is now " + state.name()); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change state of the peer with id=" + id, e); + } + } + + /** + * Helper method to connect to a peer + * @param peerId peer's identifier + * @return object representing the peer + * @throws ReplicationException + */ + private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException { + Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId); + if (pair == null) { + return null; + } + Configuration peerConf = pair.getSecond(); + + ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, + peerConf, peerId, pair.getFirst(), abortable); + try { + peer.startStateTracker(this.getPeerStateNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Error starting the peer state tracker for peerId=" + + peerId, e); + } + + try { + peer.startPeerConfigTracker(this.getPeerNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + + peerId, e); + } + + return peer; + } + + private void checkQueuesDeleted(String peerId) throws ReplicationException { + if (queuesClient == null) return; + try { + List<String> replicators = queuesClient.getListOfReplicators(); + if (replicators == null || replicators.isEmpty()) { + return; + } + for (String replicator : replicators) { + List<String> queueIds = queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + throw new ReplicationException("undeleted queue for peerId: " + peerId + + ", replicator: " + replicator + ", queueId: " + queueId); + } + } + } + // Check for hfile-refs queue + if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) + && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { + throw new ReplicationException("Undeleted queue for peerId: " + peerId + + ", found in hfile-refs node path " + hfileRefsZNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java new file mode 100644 index 0000000..1403f6d --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; + +/** + * This class is responsible for the parsing logic for a znode representing a queue. + * It will extract the peerId if it's recovered as well as the dead region servers + * that were part of the queue's history. + */ +@InterfaceAudience.Private +public class ReplicationQueueInfo { + private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class); + + private final String peerId; + private final String peerClusterZnode; + private boolean queueRecovered; + // List of all the dead region servers that had this queue (if recovered) + private List<String> deadRegionServers = new ArrayList<>(); + + /** + * The passed znode will be either the id of the peer cluster or + * the handling story of that queue in the form of id-servername-* + */ + public ReplicationQueueInfo(String znode) { + this.peerClusterZnode = znode; + String[] parts = znode.split("-", 2); + this.queueRecovered = parts.length != 1; + this.peerId = this.queueRecovered ? + parts[0] : peerClusterZnode; + if (parts.length >= 2) { + // extract dead servers + extractDeadServersFromZNodeString(parts[1], this.deadRegionServers); + } + } + + /** + * Parse dead server names from znode string servername can contain "-" such as + * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following + * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-... + */ + private static void + extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) { + + if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return; + + // valid server name delimiter "-" has to be after "," in a server name + int seenCommaCnt = 0; + int startIndex = 0; + int len = deadServerListStr.length(); + + for (int i = 0; i < len; i++) { + switch (deadServerListStr.charAt(i)) { + case ',': + seenCommaCnt += 1; + break; + case '-': + if(seenCommaCnt>=2) { + if (i > startIndex) { + String serverName = deadServerListStr.substring(startIndex, i); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name:" + serverName); + } + startIndex = i + 1; + } + seenCommaCnt = 0; + } + break; + default: + break; + } + } + + // add tail + if(startIndex < len - 1){ + String serverName = deadServerListStr.substring(startIndex, len); + if(ServerName.isFullServerName(serverName)){ + result.add(serverName); + } else { + LOG.error("Found invalid server name at the end:" + serverName); + } + } + + LOG.debug("Found dead servers:" + result); + } + + public List<String> getDeadRegionServers() { + return Collections.unmodifiableList(this.deadRegionServers); + } + + public String getPeerId() { + return this.peerId; + } + + public String getPeerClusterZnode() { + return this.peerClusterZnode; + } + + public boolean isQueueRecovered() { + return queueRecovered; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java new file mode 100644 index 0000000..be5a590 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.SortedSet; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Pair; + +/** + * This provides an interface for maintaining a region server's replication queues. These queues + * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled) + * that still need to be replicated to remote clusters. + */ +@InterfaceAudience.Private +public interface ReplicationQueues { + + /** + * Initialize the region server replication queue interface. + * @param serverName The server name of the region server that owns the replication queues this + * interface manages. + */ + void init(String serverName) throws ReplicationException; + + /** + * Remove a replication queue. + * @param queueId a String that identifies the queue. + */ + void removeQueue(String queueId); + + /** + * Add a new WAL file to the given queue. If the queue does not exist it is created. + * @param queueId a String that identifies the queue. + * @param filename name of the WAL + */ + void addLog(String queueId, String filename) throws ReplicationException; + + /** + * Remove an WAL file from the given queue. + * @param queueId a String that identifies the queue. + * @param filename name of the WAL + */ + void removeLog(String queueId, String filename); + + /** + * Set the current position for a specific WAL in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the WAL + * @param position the current position in the file + */ + void setLogPosition(String queueId, String filename, long position); + + /** + * Get the current position for a specific WAL in a given queue. + * @param queueId a String that identifies the queue + * @param filename name of the WAL + * @return the current position in the file + */ + long getLogPosition(String queueId, String filename) throws ReplicationException; + + /** + * Remove all replication queues for this region server. + */ + void removeAllQueues(); + + /** + * Get a list of all WALs in the given queue. + * @param queueId a String that identifies the queue + * @return a list of WALs, null if no such queue exists for this server + */ + List<String> getLogsInQueue(String queueId); + + /** + * Get a list of all queues for this region server. + * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues + */ + List<String> getAllQueues(); + + /** + * Get queueIds from a dead region server, whose queues has not been claimed by other region + * servers. + * @return empty if the queue exists but no children, null if the queue does not exist. + */ + List<String> getUnClaimedQueueIds(String regionserver); + + /** + * Take ownership for the queue identified by queueId and belongs to a dead region server. + * @param regionserver the id of the dead region server + * @param queueId the id of the queue + * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue. + */ + Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId); + + /** + * Remove the znode of region server if the queue is empty. + * @param regionserver + */ + void removeReplicatorIfQueueIsEmpty(String regionserver); + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + List<String> getListOfReplicators(); + + /** + * Checks if the provided znode is the same as this region server's + * @param regionserver the id of the region server + * @return if this is this rs's znode + */ + boolean isThisOurRegionServer(String regionserver); + + /** + * Add a peer to hfile reference queue if peer does not exist. + * @param peerId peer cluster id to be added + * @throws ReplicationException if fails to add a peer id to hfile reference queue + */ + void addPeerToHFileRefs(String peerId) throws ReplicationException; + + /** + * Remove a peer from hfile reference queue. + * @param peerId peer cluster id to be removed + */ + void removePeerFromHFileRefs(String peerId); + + /** + * Add new hfile references to the queue. + * @param peerId peer cluster id to which the hfiles need to be replicated + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue } + * @throws ReplicationException if fails to add a hfile reference + */ + void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException; + + /** + * Remove hfile references from the queue. + * @param peerId peer cluster id from which this hfile references needs to be removed + * @param files list of hfile references to be removed + */ + void removeHFileRefs(String peerId, List<String> files); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java new file mode 100644 index 0000000..12fc6a1 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various + * ReplicationQueues Implementations with different constructor arguments by reflection. + */ +@InterfaceAudience.Private +public class ReplicationQueuesArguments { + + private ZooKeeperWatcher zk; + private Configuration conf; + private Abortable abort; + + public ReplicationQueuesArguments(Configuration conf, Abortable abort) { + this.conf = conf; + this.abort = abort; + } + + public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) { + this(conf, abort); + setZk(zk); + } + + public ZooKeeperWatcher getZk() { + return zk; + } + + public void setZk(ZooKeeperWatcher zk) { + this.zk = zk; + } + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Abortable getAbortable() { + return abort; + } + + public void setAbortable(Abortable abort) { + this.abort = abort; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java new file mode 100644 index 0000000..6d8900e --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +/** + * This provides an interface for clients of replication to view replication queues. These queues + * keep track of the sources(WALs/HFile references) that still need to be replicated to remote + * clusters. + */ +@InterfaceAudience.Private +public interface ReplicationQueuesClient { + + /** + * Initialize the replication queue client interface. + */ + public void init() throws ReplicationException; + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + * @throws KeeperException zookeeper exception + */ + List<String> getListOfReplicators() throws KeeperException; + + /** + * Get a list of all WALs in the given queue on the given region server. + * @param serverName the server name of the region server that owns the queue + * @param queueId a String that identifies the queue + * @return a list of WALs, null if this region server is dead and has no outstanding queues + * @throws KeeperException zookeeper exception + */ + List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException; + + /** + * Get a list of all queues for the specified region server. + * @param serverName the server name of the region server that owns the set of queues + * @return a list of queueIds, null if this region server is not a replicator. + */ + List<String> getAllQueues(String serverName) throws KeeperException; + + /** + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. + */ + Set<String> getAllWALs() throws KeeperException; + + /** + * Get the change version number of replication hfile references node. This can be used as + * optimistic locking to get a consistent snapshot of the replication queues of hfile references. + * @return change version number of hfile references node + */ + int getHFileRefsNodeChangeVersion() throws KeeperException; + + /** + * Get list of all peers from hfile reference queue. + * @return a list of peer ids + * @throws KeeperException zookeeper exception + */ + List<String> getAllPeersFromHFileRefsQueue() throws KeeperException; + + /** + * Get a list of all hfile references in the given peer. + * @param peerId a String that identifies the peer + * @return a list of hfile references, null if not found any + * @throws KeeperException zookeeper exception + */ + List<String> getReplicableHFiles(String peerId) throws KeeperException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java new file mode 100644 index 0000000..834f831 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + +/** + * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct + * various ReplicationQueuesClient Implementations with different constructor arguments by + * reflection. + */ +@InterfaceAudience.Private +public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments { + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort, + ZooKeeperWatcher zk) { + super(conf, abort, zk); + } + public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) { + super(conf, abort); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java new file mode 100644 index 0000000..1981131 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +@InterfaceAudience.Private +public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements + ReplicationQueuesClient { + + Log LOG = LogFactory.getLog(ReplicationQueuesClientZKImpl.class); + + public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) { + this(args.getZk(), args.getConf(), args.getAbortable()); + } + + public ReplicationQueuesClientZKImpl(final ZooKeeperWatcher zk, Configuration conf, + Abortable abortable) { + super(zk, conf, abortable); + } + + @Override + public void init() throws ReplicationException { + try { + if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) { + ZKUtil.createWithParents(this.zookeeper, this.queuesZNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Internal error while initializing a queues client", e); + } + } + + @Override + public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + znode = ZKUtil.joinZNode(znode, queueId); + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of wals for queueId=" + queueId + + " and serverName=" + serverName, e); + throw e; + } + return result; + } + + @Override + public List<String> getAllQueues(String serverName) throws KeeperException { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e); + throw e; + } + return result; + } + + @Override + public Set<String> getAllWALs() throws KeeperException { + /** + * Load all wals in all replication queues from ZK. This method guarantees to return a + * snapshot which contains all WALs in the zookeeper at the start of this call even there + * is concurrent queue failover. However, some newly created WALs during the call may + * not be included. + */ + for (int retry = 0; ; retry++) { + int v0 = getQueuesZNodeCversion(); + List<String> rss = getListOfReplicators(); + if (rss == null || rss.isEmpty()) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return ImmutableSet.of(); + } + Set<String> wals = Sets.newHashSet(); + for (String rs : rss) { + List<String> listOfPeers = getAllQueues(rs); + // if rs just died, this will be null + if (listOfPeers == null) { + continue; + } + for (String id : listOfPeers) { + List<String> peersWals = getLogsInQueue(rs, id); + if (peersWals != null) { + wals.addAll(peersWals); + } + } + } + int v1 = getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); + } + } + + public int getQueuesZNodeCversion() throws KeeperException { + try { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); + return stat.getCversion(); + } catch (KeeperException e) { + this.abortable.abort("Failed to get stat of replication rs node", e); + throw e; + } + } + + @Override + public int getHFileRefsNodeChangeVersion() throws KeeperException { + Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat); + } catch (KeeperException e) { + this.abortable.abort("Failed to get stat of replication hfile references node.", e); + throw e; + } + return stat.getCversion(); + } + + @Override + public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException { + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of all peers in hfile references node.", e); + throw e; + } + return result; + } + + @Override + public List<String> getReplicableHFiles(String peerId) throws KeeperException { + String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId); + List<String> result = null; + try { + result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode); + } catch (KeeperException e) { + this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e); + throw e; + } + return result; + } +}