Repository: hbase
Updated Branches:
  refs/heads/branch-1 e51584381 -> d87b05f04


HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d87b05f0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d87b05f0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d87b05f0

Branch: refs/heads/branch-1
Commit: d87b05f040608e7ffa7908246d914d5ff1353943
Parents: e515843
Author: tedyu <[email protected]>
Authored: Sun Dec 11 07:43:10 2016 -0800
Committer: tedyu <[email protected]>
Committed: Sun Dec 11 07:43:10 2016 -0800

----------------------------------------------------------------------
 .../replication/ReplicationQueuesZKImpl.java    |   2 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  11 +
 .../cleaner/ReplicationZKNodeCleaner.java       | 210 +++++++++++++++++++
 .../cleaner/ReplicationZKNodeCleanerChore.java  |  55 +++++
 .../hbase/util/hbck/ReplicationChecker.java     | 141 +++----------
 .../cleaner/TestReplicationZKNodeCleaner.java   | 113 ++++++++++
 6 files changed, 418 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d87b05f0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 4a4c8cf..a903159 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -69,7 +69,7 @@ public class ReplicationQueuesZKImpl extends 
ReplicationStateZKBase implements R
   /** Znode containing all replication queues for this region server. */
   private String myQueuesZnode;
   /** Name of znode we use to lock during failover */
-  private final static String RS_LOCK_ZNODE = "lock";
+  public final static String RS_LOCK_ZNODE = "lock";
 
   private static final Log LOG = 
LogFactory.getLog(ReplicationQueuesZKImpl.class);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d87b05f0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index cf54397..b68ca55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -99,6 +99,8 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
 import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
@@ -324,6 +326,7 @@ public class HMaster extends HRegionServer implements 
MasterServices, Server {
 
   CatalogJanitor catalogJanitorChore;
   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
+  private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
   private ReplicationMetaCleaner replicationMetaCleaner;
   private LogCleaner logCleaner;
   private HFileCleaner hfileCleaner;
@@ -1183,6 +1186,13 @@ public class HMaster extends HRegionServer implements 
MasterServices, Server {
       }
     }
     try {
+      replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, 
cleanerInterval,
+          new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
+      getChoreService().scheduleChore(replicationZKNodeCleanerChore);
+    } catch (Exception e) {
+      LOG.error("start replicationZKNodeCleanerChore failed", e);
+    }
+    try {
       replicationMetaCleaner = new ReplicationMetaCleaner(this, this, 
cleanerInterval);
       getChoreService().scheduleChore(replicationMetaCleaner);
     } catch (Exception e) {
@@ -1222,6 +1232,7 @@ public class HMaster extends HRegionServer implements 
MasterServices, Server {
     if (this.logCleaner != null) this.logCleaner.cancel(true);
     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
     if (this.replicationZKLockCleanerChore != null) 
this.replicationZKLockCleanerChore.cancel(true);
+    if (this.replicationZKNodeCleanerChore != null) 
this.replicationZKNodeCleanerChore.cancel(true);
     if (this.replicationMetaCleaner != null) 
this.replicationMetaCleaner.cancel(true);
     if (this.quotaManager != null) this.quotaManager.stop();
     if (this.activeMasterManager != null) this.activeMasterManager.stop();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d87b05f0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
new file mode 100644
index 0000000..8311b8d
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Used to clean the replication queues belonging to the peer which does not 
exist.
+ */
[email protected]
+public class ReplicationZKNodeCleaner {
+  private static final Log LOG = 
LogFactory.getLog(ReplicationZKNodeCleaner.class);
+  private final ZooKeeperWatcher zkw;
+  private final ReplicationQueuesClient queuesClient;
+  private final ReplicationPeers replicationPeers;
+  private final ReplicationQueueDeletor queueDeletor;
+  private final boolean useMulti;
+
+  public ReplicationZKNodeCleaner(Configuration conf, ZooKeeperWatcher zkw, 
Abortable abortable)
+      throws IOException {
+    try {
+      this.zkw = zkw;
+      this.queuesClient = ReplicationFactory
+          .getReplicationQueuesClient(zkw, conf, abortable);
+      this.queuesClient.init();
+      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, 
conf, this.queuesClient,
+        abortable);
+      this.replicationPeers.init();
+      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
+      this.useMulti = conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
+    } catch (Exception e) {
+      throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
+    }
+  }
+
+  /**
+   * @return undeletedQueues replicator with its queueIds for removed peers
+   * @throws IOException
+   */
+  public Map<String, List<String>> getUnDeletedQueues() throws IOException {
+    Map<String, List<String>> undeletedQueues = new HashMap<>();
+    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
+    try {
+      List<String> replicators = this.queuesClient.getListOfReplicators();
+      for (String replicator : replicators) {
+        List<String> queueIds = this.queuesClient.getAllQueues(replicator);
+        for (String queueId : queueIds) {
+          if (!useMulti && 
queueId.equals(ReplicationQueuesZKImpl.RS_LOCK_ZNODE)) {
+            continue;
+          }
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          if (!peerIds.contains(queueInfo.getPeerId())) {
+            if (!undeletedQueues.containsKey(replicator)) {
+              undeletedQueues.put(replicator, new ArrayList<String>());
+            }
+            undeletedQueues.get(replicator).add(queueId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Undeleted replication queue for removed peer found: "
+                  + String.format("[removedPeerId=%s, replicator=%s, 
queueId=%s]",
+                    queueInfo.getPeerId(), replicator, queueId));
+            }
+          }
+        }
+      }
+    } catch (KeeperException ke) {
+      throw new IOException("Failed to get the replication queues of all 
replicators", ke);
+    }
+    return undeletedQueues;
+  }
+
+  /**
+   * @return undeletedHFileRefsQueue replicator with its undeleted queueIds 
for removed peers in
+   *         hfile-refs queue
+   * @throws IOException
+   */
+  public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
+    Set<String> undeletedHFileRefsQueue = new HashSet<>();
+    Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
+    String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
+    try {
+      if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
+        return null;
+      }
+      List<String> listOfPeers = 
this.queuesClient.getAllPeersFromHFileRefsQueue();
+      Set<String> peers = new HashSet<>(listOfPeers);
+      peers.removeAll(peerIds);
+      if (!peers.isEmpty()) {
+        undeletedHFileRefsQueue.addAll(peers);
+      }
+    } catch (KeeperException e) {
+      throw new IOException("Failed to get list of all peers from hfile-refs 
znode "
+          + hfileRefsZNode, e);
+    }
+    return undeletedHFileRefsQueue;
+  }
+
+  private class ReplicationQueueDeletor extends ReplicationStateZKBase {
+
+    public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, 
Abortable abortable) {
+      super(zk, conf, abortable);
+    }
+
+    /**
+     * @param replicator The regionserver which has undeleted queue
+     * @param queueId The undeleted queue id
+     * @throws IOException
+     */
+    public void removeQueue(final String replicator, final String queueId) 
throws IOException {
+      String queueZnodePath = 
ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
+        queueId);
+      try {
+        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+        if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) 
{
+          ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
+          LOG.info("Successfully removed replication queue, replicator: " + 
replicator
+              + ", queueId: " + queueId);
+        }
+      } catch (KeeperException e) {
+        throw new IOException("Failed to delete queue, replicator: " + 
replicator + ", queueId: "
+            + queueId);
+      }
+    }
+
+    /**
+     * @param hfileRefsQueueId The undeleted hfile-refs queue id
+     * @throws IOException
+     */
+    public void removeHFileRefsQueue(final String hfileRefsQueueId) throws 
IOException {
+      String node = ZKUtil.joinZNode(this.hfileRefsZNode, hfileRefsQueueId);
+      try {
+        if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) {
+          ZKUtil.deleteNodeRecursively(this.zookeeper, node);
+          LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId 
+ " from path "
+              + hfileRefsZNode);
+        }
+      } catch (KeeperException e) {
+        throw new IOException("Failed to delete hfile-refs queue " + 
hfileRefsQueueId
+            + " from path " + hfileRefsZNode);
+      }
+    }
+
+    String getHfileRefsZNode() {
+      return this.hfileRefsZNode;
+    }
+  }
+
+  /**
+   * Remove the undeleted replication queue's zk node for removed peers.
+   * @param undeletedQueues replicator with its queueIds for removed peers
+   * @throws IOException
+   */
+  public void removeQueues(final Map<String, List<String>> undeletedQueues) 
throws IOException {
+    for (Entry<String, List<String>> replicatorAndQueueIds : 
undeletedQueues.entrySet()) {
+      String replicator = replicatorAndQueueIds.getKey();
+      for (String queueId : replicatorAndQueueIds.getValue()) {
+        queueDeletor.removeQueue(replicator, queueId);
+      }
+    }
+  }
+
+  /**
+   * Remove the undeleted hfile-refs queue's zk node for removed peers.
+   * @param undeletedHFileRefsQueues replicator with its undeleted queueIds 
for removed peers in
+   *          hfile-refs queue
+   * @throws IOException
+   */
+  public void removeHFileRefsQueues(final Set<String> 
undeletedHFileRefsQueues) throws IOException {
+    for (String hfileRefsQueueId : undeletedHFileRefsQueues) {
+      queueDeletor.removeHFileRefsQueue(hfileRefsQueueId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d87b05f0/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
new file mode 100644
index 0000000..4bc1244
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Chore that will clean the replication queues belonging to the peer which 
does not exist.
+ */
[email protected]
+public class ReplicationZKNodeCleanerChore extends ScheduledChore {
+  private static final Log LOG = 
LogFactory.getLog(ReplicationZKNodeCleanerChore.class);
+  private final ReplicationZKNodeCleaner cleaner;
+
+  public ReplicationZKNodeCleanerChore(Stoppable stopper, int period,
+      ReplicationZKNodeCleaner cleaner) {
+    super("ReplicationZKNodeCleanerChore", stopper, period);
+    this.cleaner = cleaner;
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+      cleaner.removeQueues(undeletedQueues);
+    } catch (IOException e) {
+      LOG.warn("Failed to clean replication zk node", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d87b05f0/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 64212c9..dcb0010 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.util.hbck;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -27,159 +26,75 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.util.HBaseFsck;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
 
 /*
  * Check and fix undeleted replication queues for removed peerId.
  */
 @InterfaceAudience.Private
 public class ReplicationChecker {
-  private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
-  private final ZooKeeperWatcher zkw;
-  private ErrorReporter errorReporter;
-  private ReplicationQueuesClient queuesClient;
-  private ReplicationPeers replicationPeers;
-  private ReplicationQueueDeletor queueDeletor;
+  private final ErrorReporter errorReporter;
   // replicator with its queueIds for removed peers
   private Map<String, List<String>> undeletedQueueIds = new HashMap<String, 
List<String>>();
   // Set of un deleted hfile refs queue Ids
   private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
-  private final String hfileRefsZNode;
+  private final ReplicationZKNodeCleaner cleaner;
 
   public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, 
HConnection connection,
       ErrorReporter errorReporter) throws IOException {
-    try {
-      this.zkw = zkw;
-      this.errorReporter = errorReporter;
-      this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, 
conf, connection);
-      this.queuesClient.init();
-      this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, 
conf, this.queuesClient,
-        connection);
-      this.replicationPeers.init();
-      this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection);
-    } catch (ReplicationException e) {
-      throw new IOException("failed to construct ReplicationChecker", e);
-    }
-
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", 
"replication");
-    String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, 
replicationZNodeName);
-    String hfileRefsZNodeName =
-        
conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
-          
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
-    hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
+    this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection);
+    this.errorReporter = errorReporter;
   }
 
   public boolean hasUnDeletedQueues() {
-    return errorReporter.getErrorList()
-        
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+    return errorReporter.getErrorList().contains(
+      HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
   }
 
   public void checkUnDeletedQueues() throws IOException {
-    Set<String> peerIds = new 
HashSet<String>(this.replicationPeers.getAllPeerIds());
-    try {
-      List<String> replicators = this.queuesClient.getListOfReplicators();
-      for (String replicator : replicators) {
-        List<String> queueIds = this.queuesClient.getAllQueues(replicator);
-        for (String queueId : queueIds) {
-          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-          if (!peerIds.contains(queueInfo.getPeerId())) {
-            if (!undeletedQueueIds.containsKey(replicator)) {
-              undeletedQueueIds.put(replicator, new ArrayList<String>());
-            }
-            undeletedQueueIds.get(replicator).add(queueId);
-
-            String msg = "Undeleted replication queue for removed peer found: "
-                + String.format("[removedPeerId=%s, replicator=%s, 
queueId=%s]",
-                  queueInfo.getPeerId(), replicator, queueId);
-            errorReporter.reportError(
-              HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, 
msg);
-          }
-        }
-      }
-    } catch (KeeperException ke) {
-      throw new IOException(ke);
-    }
-
-    checkUnDeletedHFileRefsQueues(peerIds);
-  }
-
-  private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws 
IOException {
-    try {
-      if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
-        return;
-      }
-      List<String> listOfPeers = 
this.queuesClient.getAllPeersFromHFileRefsQueue();
-      Set<String> peers = new HashSet<>(listOfPeers);
-      peers.removeAll(peerIds);
-      if (!peers.isEmpty()) {
-        undeletedHFileRefsQueueIds.addAll(peers);
-        String msg =
-            "Undeleted replication hfile-refs queue for removed peer found: "
-                + undeletedHFileRefsQueueIds + " under hfile-refs node " + 
hfileRefsZNode;
+    undeletedQueueIds = cleaner.getUnDeletedQueues();
+    for (Entry<String, List<String>> replicatorAndQueueIds : 
undeletedQueueIds.entrySet()) {
+      String replicator = replicatorAndQueueIds.getKey();
+      for (String queueId : replicatorAndQueueIds.getValue()) {
+        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+        String msg = "Undeleted replication queue for removed peer found: "
+            + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", 
queueInfo.getPeerId(),
+              replicator, queueId);
         
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
           msg);
       }
-    } catch (KeeperException e) {
-      throw new IOException("Failed to get list of all peers from hfile-refs 
znode "
-          + hfileRefsZNode, e);
     }
-  }
 
-  private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
-    public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, 
Abortable abortable) {
-      super(zk, conf, abortable);
-    }
+    checkUnDeletedHFileRefsQueues();
+  }
 
-    public void removeQueue(String replicator, String queueId) throws 
IOException {
-      String queueZnodePath = 
ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator),
-        queueId);
-      try {
-        ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
-        LOG.info("remove replication queue, replicator: " + replicator + ", 
queueId: " + queueId);
-      } catch (KeeperException e) {
-        throw new IOException("failed to delete queue, replicator: " + 
replicator + ", queueId: "
-            + queueId);
-      }
+  private void checkUnDeletedHFileRefsQueues() throws IOException {
+    undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues();
+    if (undeletedHFileRefsQueueIds != null && 
!undeletedHFileRefsQueueIds.isEmpty()) {
+      String msg = "Undeleted replication hfile-refs queue for removed peer 
found: "
+          + undeletedHFileRefsQueueIds + " under hfile-refs node";
+      errorReporter
+          
.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, 
msg);
     }
   }
 
   public void fixUnDeletedQueues() throws IOException {
-    for (Entry<String, List<String>> replicatorAndQueueIds : 
undeletedQueueIds.entrySet()) {
-      String replicator = replicatorAndQueueIds.getKey();
-      for (String queueId : replicatorAndQueueIds.getValue()) {
-        queueDeletor.removeQueue(replicator, queueId);
-      }
+    if (!undeletedQueueIds.isEmpty()) {
+      cleaner.removeQueues(undeletedQueueIds);
     }
     fixUnDeletedHFileRefsQueue();
   }
 
   private void fixUnDeletedHFileRefsQueue() throws IOException {
-    for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
-      String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
-      try {
-        ZKUtil.deleteNodeRecursively(this.zkw, node);
-        LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + 
" from path "
-            + hfileRefsZNode);
-      } catch (KeeperException e) {
-        throw new IOException("Failed to delete hfile-refs queue " + 
hfileRefsQueueId
-            + " from path " + hfileRefsZNode);
-      }
+    if (undeletedHFileRefsQueueIds != null && 
!undeletedHFileRefsQueueIds.isEmpty()) {
+      cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d87b05f0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
new file mode 100644
index 0000000..6cb4973
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestReplicationZKNodeCleaner {
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private final String ID_ONE = "1";
+  private final String SERVER_ONE = "server1";
+  private final String ID_TWO = "2";
+  private final String SERVER_TWO = "server2";
+
+  private final Configuration conf;
+  private final ZooKeeperWatcher zkw;
+  private final ReplicationQueues repQueues;
+
+  public TestReplicationZKNodeCleaner() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    zkw = new ZooKeeperWatcher(conf, "TestReplicationZKNodeCleaner", null);
+    repQueues = ReplicationFactory.getReplicationQueues(zkw, conf, null);
+    assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 
10000);
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testReplicationZKNodeCleaner() throws Exception {
+    repQueues.init(SERVER_ONE);
+    // add queue for ID_ONE which isn't exist
+    repQueues.addLog(ID_ONE, "file1");
+
+    ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, 
null);
+    Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+    assertEquals(1, undeletedQueues.size());
+    assertTrue(undeletedQueues.containsKey(SERVER_ONE));
+    assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
+    assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
+
+    // add a recovery queue for ID_TWO which isn't exist
+    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+
+    undeletedQueues = cleaner.getUnDeletedQueues();
+    assertEquals(1, undeletedQueues.size());
+    assertTrue(undeletedQueues.containsKey(SERVER_ONE));
+    assertEquals(2, undeletedQueues.get(SERVER_ONE).size());
+    assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
+    assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + 
SERVER_TWO));
+
+    cleaner.removeQueues(undeletedQueues);
+    undeletedQueues = cleaner.getUnDeletedQueues();
+    assertEquals(0, undeletedQueues.size());
+  }
+
+  @Test
+  public void testReplicationZKNodeCleanerChore() throws Exception {
+    repQueues.init(SERVER_ONE);
+    // add queue for ID_ONE which isn't exist
+    repQueues.addLog(ID_ONE, "file1");
+    // add a recovery queue for ID_TWO which isn't exist
+    repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
+
+    // Wait the cleaner chore to run
+    Thread.sleep(20000);
+
+    ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, 
null);
+    assertEquals(0, cleaner.getUnDeletedQueues().size());
+  }
+}

Reply via email to