ACCUMULO-3887 Support optional sessionId in tserver stop. Also added more debuggin inside the master around the FATE shutdownTServer op. New unit tests as well.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c5aa060e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c5aa060e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c5aa060e Branch: refs/heads/master Commit: c5aa060ee90fe5fa1fe9923973394d4b204f7f62 Parents: d764d27 Author: Josh Elser <[email protected]> Authored: Wed Jun 3 20:08:46 2015 -0400 Committer: Josh Elser <[email protected]> Committed: Wed Jun 3 21:00:06 2015 -0400 ---------------------------------------------------------------------- .../accumulo/server/master/LiveTServerSet.java | 27 +++++- .../org/apache/accumulo/server/util/Admin.java | 45 +++++++++- .../server/master/LiveTServerSetTest.java | 52 +++++++++++ .../apache/accumulo/server/util/AdminTest.java | 94 ++++++++++++++++++++ .../master/MasterClientServiceHandler.java | 5 ++ .../master/tserverOps/ShutdownTServer.java | 2 +- 6 files changed, 219 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java index 0ea6b41..0c0cceb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java @@ -367,10 +367,29 @@ public class LiveTServerSet implements Watcher { } public synchronized TServerInstance find(String tabletServer) { - HostAndPort addr = AddressUtil.parseAddress(tabletServer, false); - for (Entry<String,TServerInfo> entry : current.entrySet()) { - if (entry.getValue().instance.getLocation().equals(addr)) - return entry.getValue().instance; + return find(current, tabletServer); + } + + TServerInstance find(Map<String,TServerInfo> servers, String tabletServer) { + HostAndPort addr; + String sessionId = null; + if (']' == tabletServer.charAt(tabletServer.length() - 1)) { + int index = tabletServer.indexOf('['); + if (-1 == index) { + throw new IllegalArgumentException("Could not parse tabletserver '" + tabletServer + "'"); + } + addr = AddressUtil.parseAddress(tabletServer.substring(0, index), false); + // Strip off the last bracket + sessionId = tabletServer.substring(index + 1, tabletServer.length() - 1); + } else { + addr = AddressUtil.parseAddress(tabletServer, false); + } + for (Entry<String,TServerInfo> entry : servers.entrySet()) { + if (entry.getValue().instance.getLocation().equals(addr)) { + // Return the instance if we have no desired session ID, or we match the desired session ID + if (null == sessionId || sessionId.equals(entry.getValue().instance.getSession())) + return entry.getValue().instance; + } } return null; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index bdbc7f0..c7f9739 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -31,6 +31,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; @@ -54,12 +55,17 @@ import org.apache.accumulo.core.security.SystemPermission; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.trace.Tracer; import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; +import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.cli.ClientOpts; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.security.SecurityUtil; import org.apache.accumulo.start.spi.KeywordExecutable; import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +73,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.google.auto.service.AutoService; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.HostAndPort; @@ -362,9 +369,12 @@ public class Admin implements KeywordExecutable { log.info("No masters running. Not attempting safe unload of tserver."); return; } + final Instance instance = context.getInstance(); + final String zTServerRoot = getTServersZkPath(instance); + final ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); for (String server : servers) { HostAndPort address = AddressUtil.parseAddress(server, context.getConfiguration().getPort(Property.TSERV_CLIENTPORT)); - final String finalServer = address.toString(); + final String finalServer = qualifyWithZooKeeperSessionId(zTServerRoot, zc, address.toString()); log.info("Stopping server " + finalServer); MasterClient.execute(context, new ClientExec<MasterClientService.Client>() { @Override @@ -375,6 +385,39 @@ public class Admin implements KeywordExecutable { } } + /** + * Get the parent ZNode for tservers for the given instance + * + * @param instance + * The Instance + * @return The tservers znode for the instance + */ + static String getTServersZkPath(Instance instance) { + Preconditions.checkNotNull(instance); + final String instanceRoot = ZooUtil.getRoot(instance); + return instanceRoot + Constants.ZTSERVERS; + } + + /** + * Look up the TabletServers in ZooKeeper and try to find a sessionID for this server reference + * + * @param hostAndPort + * The host and port for a TabletServer + * @return The host and port with the session ID in square-brackets appended, or the original value. + */ + static String qualifyWithZooKeeperSessionId(String zTServerRoot, ZooCache zooCache, String hostAndPort) { + try { + long sessionId = ZooLock.getSessionId(zooCache, zTServerRoot + "/" + hostAndPort); + if (0 == sessionId) { + return hostAndPort; + } + return hostAndPort + "[" + sessionId + "]"; + } catch (InterruptedException | KeeperException e) { + log.warn("Failed to communicate with ZooKeeper to find session ID for TabletServer."); + return hostAndPort; + } + } + private static final String ACCUMULO_SITE_BACKUP_FILE = "accumulo-site.xml.bak"; private static final String NS_FILE_SUFFIX = "_ns.cfg"; private static final String USER_FILE_SUFFIX = "_user.cfg"; http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java new file mode 100644 index 0000000..bcdb832 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/master/LiveTServerSetTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.server.master.LiveTServerSet.Listener; +import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; +import org.apache.accumulo.server.master.LiveTServerSet.TServerInfo; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.easymock.EasyMock; +import org.junit.Test; + +import com.google.common.net.HostAndPort; + +public class LiveTServerSetTest { + + @Test + public void testSessionIds() { + Map<String,TServerInfo> servers = new HashMap<>(); + TServerConnection mockConn = EasyMock.createMock(TServerConnection.class); + + TServerInfo server1 = new TServerInfo(new TServerInstance(HostAndPort.fromParts("localhost", 1234), "5555"), mockConn); + servers.put("server1", server1); + + LiveTServerSet tservers = new LiveTServerSet(EasyMock.createMock(ClientContext.class), EasyMock.createMock(Listener.class)); + + assertEquals(server1.instance, tservers.find(servers, "localhost:1234")); + assertNull(tservers.find(servers, "localhost:4321")); + assertEquals(server1.instance, tservers.find(servers, "localhost:1234[5555]")); + assertNull(tservers.find(servers, "localhost:1234[55755]")); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java new file mode 100644 index 0000000..9685b28 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util; + +import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.zookeeper.data.Stat; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Test; + +public class AdminTest { + + @Test + public void testZooKeeperTserverPath() { + Instance instance = EasyMock.createMock(Instance.class); + String instanceId = UUID.randomUUID().toString(); + + EasyMock.expect(instance.getInstanceID()).andReturn(instanceId); + + EasyMock.replay(instance); + + assertEquals(Constants.ZROOT + "/" + instanceId + Constants.ZTSERVERS, Admin.getTServersZkPath(instance)); + + EasyMock.verify(instance); + } + + @Test + public void testQualifySessionId() { + ZooCache zc = EasyMock.createMock(ZooCache.class); + + String root = "/accumulo/id/tservers"; + String server = "localhost:12345"; + final long session = 123456789l; + + String serverPath = root + "/" + server; + EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.singletonList("child")); + EasyMock.expect(zc.get(EasyMock.eq(serverPath + "/child"), EasyMock.anyObject(Stat.class))).andAnswer(new IAnswer<byte[]>() { + + @Override + public byte[] answer() throws Throwable { + Stat stat = (Stat) EasyMock.getCurrentArguments()[1]; + stat.setEphemeralOwner(session); + return new byte[0]; + } + + }); + + EasyMock.replay(zc); + + assertEquals(server + "[" + session + "]", Admin.qualifyWithZooKeeperSessionId(root, zc, server)); + + EasyMock.verify(zc); + } + + @Test + public void testCannotQualifySessionId() { + ZooCache zc = EasyMock.createMock(ZooCache.class); + + String root = "/accumulo/id/tservers"; + String server = "localhost:12345"; + + String serverPath = root + "/" + server; + EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.<String> emptyList()); + + EasyMock.replay(zc); + + // A server that isn't in ZooKeeper. Can't qualify it, should return the original + assertEquals(server, Admin.qualifyWithZooKeeperSessionId(root, zc, server)); + + EasyMock.verify(zc); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java index 592d9ae..e65dcec 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java @@ -295,9 +295,14 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli } long tid = master.fate.startTransaction(); + + log.debug("Seeding FATE op to shutdown " + tabletServer + " with tid " + tid); + master.fate.seedTransaction(tid, new TraceRepo<Master>(new ShutdownTServer(doomed, force)), false); master.fate.waitForCompletion(tid); master.fate.delete(tid); + + log.debug("FATE op shutting down " + tabletServer + " finished"); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/c5aa060e/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java index 5330197..11cd91b 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tserverOps/ShutdownTServer.java @@ -77,7 +77,7 @@ public class ShutdownTServer extends MasterRepo { log.error("Error talking to tablet server " + server + ": " + ex); } - // If the connection was non-null and we could coomunicate with it + // If the connection was non-null and we could communicate with it // give the master some more time to tell it to stop and for the // tserver to ack the request and stop itself. return 1000;
