Updated Branches: refs/heads/master 2ad6a8188 -> 80d1c3373
ACCUMULO-1585 track tablet servers by their entry in zookeeper, not by their resolved address ACCUMULO-1601 make interface hinting consistent across all servers Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/10b44e79 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/10b44e79 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/10b44e79 Branch: refs/heads/master Commit: 10b44e79544b5f16cd747de7926af23739bf5726 Parents: 1a48f7c Author: Eric Newton <e...@apache.org> Authored: Tue Jul 23 11:27:26 2013 -0400 Committer: Eric Newton <e...@apache.org> Committed: Tue Jul 23 11:27:26 2013 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/server/Accumulo.java | 12 --- .../server/gc/SimpleGarbageCollector.java | 71 +++++----------- .../accumulo/server/master/LiveTServerSet.java | 89 +++++++++++--------- .../apache/accumulo/server/master/Master.java | 14 +-- .../apache/accumulo/server/monitor/Monitor.java | 5 +- .../server/tabletserver/TabletServer.java | 20 +++-- .../accumulo/server/trace/TraceServer.java | 5 +- .../accumulo/server/util/TServerUtils.java | 35 ++++---- .../accumulo/server/gc/TestConfirmDeletes.java | 3 +- .../accumulo/test/functional/ZombieTServer.java | 8 +- .../test/performance/thrift/NullTserver.java | 2 +- .../org/apache/accumulo/test/ShellServerIT.java | 1 + .../accumulo/test/functional/MacTest.java | 3 - 13 files changed, 121 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/src/main/java/org/apache/accumulo/server/Accumulo.java index fa3f2f3..dc90a7b 100644 --- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -169,18 +169,6 @@ public class Accumulo { }, 1000, 10 * 60 * 1000); } - public static String getLocalAddress(String[] args) throws UnknownHostException { - InetAddress result = InetAddress.getLocalHost(); - for (int i = 0; i < args.length - 1; i++) { - if (args[i].equals("-a") || args[i].equals("--address")) { - result = InetAddress.getByName(args[i + 1]); - log.debug("Local address is: " + args[i + 1] + " (" + result.toString() + ")"); - break; - } - } - return result.getHostName(); - } - public static void waitForZookeeperAndHdfs(VolumeManager fs) { log.info("Attempting to talk to zookeeper"); while (true) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java index de73282..4387755 100644 --- a/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java +++ b/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java @@ -37,7 +37,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.cli.Help; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; @@ -79,6 +78,7 @@ import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -107,7 +107,7 @@ import com.beust.jcommander.Parameter; public class SimpleGarbageCollector implements Iface { private static final Text EMPTY_TEXT = new Text(); - static class Opts extends Help { + static class Opts extends ServerOpts { @Parameter(names = {"-v", "--verbose"}, description = "extra information will get printed to stdout also") boolean verbose = false; @Parameter(names = {"-s", "--safemode"}, description = "safe mode will not delete files") @@ -115,8 +115,6 @@ public class SimpleGarbageCollector implements Iface { @Parameter(names = {"-o", "--offline"}, description = "offline mode will run once and check data files directly; this is dangerous if accumulo is running or not shut down properly") boolean offline = false; - @Parameter(names = {"-a", "--address"}, description = "specify our local address") - String address = null; } // how much of the JVM's available memory should it use gathering candidates @@ -130,8 +128,7 @@ public class SimpleGarbageCollector implements Iface { private boolean checkForBulkProcessingFiles; private VolumeManager fs; private boolean useTrash = true; - private boolean safemode = false, offline = false, verbose = false; - private String address = "localhost"; + private Opts opts = new Opts(); private ZooLock lock; private Key continueKey = null; @@ -143,46 +140,21 @@ public class SimpleGarbageCollector implements Iface { public static void main(String[] args) throws UnknownHostException, IOException { SecurityUtil.serverLogin(); - Instance instance = HdfsZooInstance.getInstance(); ServerConfiguration serverConf = new ServerConfiguration(instance); final VolumeManager fs = VolumeManagerImpl.get(); Accumulo.init(fs, serverConf, "gc"); - String address = "localhost"; - SimpleGarbageCollector gc = new SimpleGarbageCollector(); Opts opts = new Opts(); - opts.parseArgs(SimpleGarbageCollector.class.getName(), args); - - if (opts.safeMode) - gc.setSafeMode(); - if (opts.offline) - gc.setOffline(); - if (opts.verbose) - gc.setVerbose(); - if (opts.address != null) - gc.useAddress(address); + opts.parseArgs("gc", args); + SimpleGarbageCollector gc = new SimpleGarbageCollector(opts); gc.init(fs, instance, SystemCredentials.get().getAsThrift(), serverConf.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE)); - Accumulo.enableTracing(address, "gc"); + Accumulo.enableTracing(opts.getAddress(), "gc"); gc.run(); } - public SimpleGarbageCollector() {} - - public void setSafeMode() { - this.safemode = true; - } - - public void setOffline() { - this.offline = true; - } - - public void setVerbose() { - this.verbose = true; - } - - public void useAddress(String address) { - this.address = address; + public SimpleGarbageCollector(Opts opts) { + this.opts = opts; } public void init(VolumeManager fs, Instance instance, TCredentials credentials, boolean noTrash) throws IOException { @@ -193,11 +165,11 @@ public class SimpleGarbageCollector implements Iface { gcStartDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_START); long gcDelay = instance.getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); numDeleteThreads = instance.getConfiguration().getCount(Property.GC_DELETE_THREADS); - log.info("start delay: " + (offline ? 0 + " sec (offline)" : gcStartDelay + " milliseconds")); + log.info("start delay: " + (opts.offline ? 0 + " sec (offline)" : gcStartDelay + " milliseconds")); log.info("time delay: " + gcDelay + " milliseconds"); - log.info("safemode: " + safemode); - log.info("offline: " + offline); - log.info("verbose: " + verbose); + log.info("safemode: " + opts.safeMode); + log.info("offline: " + opts.offline); + log.info("verbose: " + opts.verbose); log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes"); log.info("delete threads: " + numDeleteThreads); useTrash = !noTrash; @@ -208,7 +180,7 @@ public class SimpleGarbageCollector implements Iface { // Sleep for an initial period, giving the master time to start up and // old data files to be unused - if (!offline) { + if (!opts.offline) { try { getZooLock(startStatsService()); } catch (Exception ex) { @@ -254,8 +226,8 @@ public class SimpleGarbageCollector implements Iface { confirmDeletesSpan.stop(); // STEP 3: delete files - if (safemode) { - if (verbose) + if (opts.safeMode) { + if (opts.verbose) System.out.println("SAFEMODE: There are " + candidates.size() + " data file candidates marked for deletion.%n" + " Examine the log files to identify them.%n" + " They can be removed by executing: bin/accumulo gc --offline%n" + "WARNING: Do not run the garbage collector in offline mode unless you are positive%n" @@ -289,7 +261,7 @@ public class SimpleGarbageCollector implements Iface { tStop = System.currentTimeMillis(); log.info(String.format("Collect cycle took %.2f seconds", ((tStop - tStart) / 1000.0))); - if (offline) + if (opts.offline) break; if (candidateMemExceeded) { @@ -421,13 +393,14 @@ public class SimpleGarbageCollector implements Iface { Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(this)); int port = instance.getConfiguration().getPort(Property.GC_PORT); long maxMessageSize = instance.getConfiguration().getMemoryInBytes(Property.GENERAL_MAX_MESSAGE_SIZE); + InetSocketAddress result = new InetSocketAddress(opts.getAddress(), port); try { - TServerUtils.startTServer(port, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize); + TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize); } catch (Exception ex) { log.fatal(ex, ex); throw new RuntimeException(ex); } - return new InetSocketAddress(Accumulo.getLocalAddress(new String[] {"--address", address}), port); + return result; } /** @@ -436,7 +409,7 @@ public class SimpleGarbageCollector implements Iface { SortedSet<String> getCandidates() throws Exception { TreeSet<String> candidates = new TreeSet<String>(); - if (offline) { + if (opts.offline) { checkForBulkProcessingFiles = true; try { for (String validExtension : FileOperations.getValidExtensions()) { @@ -521,7 +494,7 @@ public class SimpleGarbageCollector implements Iface { private void confirmDeletes(String tableName, SortedSet<String> candidates) throws AccumuloException { Scanner scanner; - if (offline) { + if (opts.offline) { // TODO throw new RuntimeException("Offline scanner no longer supported"); // try { @@ -634,7 +607,7 @@ public class SimpleGarbageCollector implements Iface { // deletes; Need separate writer for the root tablet. BatchWriter writer = null; BatchWriter rootWriter = null; - if (!offline) { + if (!opts.offline) { Connector c; try { c = instance.getConnector(SystemCredentials.get().getPrincipal(), SystemCredentials.get().getToken()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java index 68255b8..3cb73b6 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java +++ b/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.Constants; @@ -201,8 +202,12 @@ public class LiveTServerSet implements Watcher { } }; - // Map from tserver master service to server information + // The set of active tservers with locks, indexed by their name in zookeeper private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>(); + // as above, indexed by TServerInstance + private Map<TServerInstance, TServerInfo> currentInstances = new HashMap<TServerInstance, TServerInfo>(); + + // The set of entries in zookeeper without locks, and the first time each was noticed private Map<String,Long> locklessServers = new HashMap<String,Long>(); public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) { @@ -240,8 +245,8 @@ public class LiveTServerSet implements Watcher { locklessServers.keySet().retainAll(all); - for (String server : all) { - checkServer(updates, doomed, path, server); + for (String zPath : all) { + checkServer(updates, doomed, path, zPath); } // log.debug("Current: " + current.keySet()); @@ -262,42 +267,46 @@ public class LiveTServerSet implements Watcher { } } - private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server) + private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String zPath) throws TException, InterruptedException, KeeperException { - TServerInfo info = current.get(server); + TServerInfo info = current.get(zPath); - final String lockPath = path + "/" + server; + final String lockPath = path + "/" + zPath; Stat stat = new Stat(); byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat); if (lockData == null) { if (info != null) { doomed.add(info.instance); - current.remove(server); + current.remove(zPath); + currentInstances.remove(info.instance); } - Long firstSeen = locklessServers.get(server); + Long firstSeen = locklessServers.get(zPath); if (firstSeen == null) { - locklessServers.put(server, System.currentTimeMillis()); - } else if (System.currentTimeMillis() - firstSeen > 600000) { - deleteServerNode(path + "/" + server); - locklessServers.remove(server); + locklessServers.put(zPath, System.currentTimeMillis()); + } else if (System.currentTimeMillis() - firstSeen > 10*60*1000) { + deleteServerNode(path + "/" + zPath); + locklessServers.remove(zPath); } } else { - locklessServers.remove(server); + locklessServers.remove(zPath); ServerServices services = new ServerServices(new String(lockData)); InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT); - InetSocketAddress addr = AddressUtil.parseAddress(server); TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner()); if (info == null) { updates.add(instance); - current.put(server, new TServerInfo(instance, new TServerConnection(addr))); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.put(instance, tServerInfo); } else if (!info.instance.equals(instance)) { doomed.add(info.instance); updates.add(instance); - current.put(server, new TServerInfo(instance, new TServerConnection(addr))); + TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(client)); + current.put(zPath, tServerInfo); + currentInstances.put(info.instance, tServerInfo); } } } @@ -339,53 +348,51 @@ public class LiveTServerSet implements Watcher { public synchronized TServerConnection getConnection(TServerInstance server) throws TException { if (server == null) return null; - TServerInfo serverInfo = current.get(server.hostPort()); - // lock was lost? - if (serverInfo == null) - return null; - // instance changed? - if (!serverInfo.instance.equals(server)) + TServerInfo tServerInfo = currentInstances.get(server); + if (tServerInfo == null) return null; - TServerConnection result = serverInfo.connection; - return result; + return tServerInfo.connection; } public synchronized Set<TServerInstance> getCurrentServers() { - HashSet<TServerInstance> result = new HashSet<TServerInstance>(); - for (TServerInfo c : current.values()) { - result.add(c.instance); - } - return result; + return new HashSet<TServerInstance>(currentInstances.keySet()); } public synchronized int size() { return current.size(); } - public synchronized TServerInstance find(String serverName) { - TServerInfo serverInfo = current.get(serverName); - if (serverInfo != null) { - return serverInfo.instance; + public synchronized TServerInstance find(String tabletServer) { + InetSocketAddress addr = AddressUtil.parseAddress(tabletServer); + for (Entry<String,TServerInfo> entry : current.entrySet()) { + if (entry.getValue().instance.getLocation().equals(addr)) + return entry.getValue().instance; } return null; } - public synchronized boolean isOnline(String serverName) { - return current.containsKey(serverName); - } - public synchronized void remove(TServerInstance server) { - current.remove(server.hostPort()); + String zPath = null; + for (Entry<String,TServerInfo> entry : current.entrySet()) { + if (entry.getValue().instance.equals(server)) { + zPath = entry.getKey(); + break; + } + } + if (zPath == null) + return; + current.remove(zPath); + currentInstances.remove(server); log.info("Removing zookeeper lock for " + server); - String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + server.hostPort(); + String fullpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + zPath; try { - ZooReaderWriter.getRetryingInstance().recursiveDelete(zpath, SKIP); + ZooReaderWriter.getRetryingInstance().recursiveDelete(fullpath, SKIP); } catch (Exception e) { String msg = "error removing tablet server lock"; log.fatal(msg, e); Halt.halt(msg, -1); } - getZooCache().clear(zpath); + getZooCache().clear(fullpath); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/master/Master.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/master/Master.java b/server/src/main/java/org/apache/accumulo/server/master/Master.java index 3d14d12..bb7fa62 100644 --- a/server/src/main/java/org/apache/accumulo/server/master/Master.java +++ b/server/src/main/java/org/apache/accumulo/server/master/Master.java @@ -89,6 +89,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -686,9 +687,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer, boolean force) throws ThriftSecurityException, TException { security.canPerformSystemActions(c); - final InetSocketAddress addr = AddressUtil.parseAddress(tabletServer); - final String addrString = org.apache.accumulo.core.util.AddressUtil.toString(addr); - final TServerInstance doomed = tserverSet.find(addrString); + final TServerInstance doomed = tserverSet.find(tabletServer); if (!force) { final TServerConnection server = tserverSet.getConnection(doomed); if (server == null) { @@ -1512,7 +1511,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler())); - clientService = TServerUtils.startServer(getSystemConfiguration(), Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, + clientService = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE).server; while (!clientService.isServing()) { @@ -1597,8 +1596,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException { log.info("trying to get master lock"); - final String masterClientAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname, getSystemConfiguration().getPort( - Property.MASTER_CLIENTPORT))); + final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT); while (true) { @@ -1629,7 +1627,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt SecurityUtil.serverLogin(); VolumeManager fs = VolumeManagerImpl.get(); - String hostname = Accumulo.getLocalAddress(args); + ServerOpts opts = new ServerOpts(); + opts.parseArgs("master", args); + String hostname = opts.getAddress(); Instance instance = HdfsZooInstance.getInstance(); ServerConfiguration conf = new ServerConfiguration(instance); Accumulo.init(fs, conf, "master"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java index 5957f26..2632b4e 100644 --- a/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java +++ b/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java @@ -49,6 +49,7 @@ import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeManager; @@ -450,7 +451,9 @@ public class Monitor { SecurityUtil.serverLogin(); VolumeManager fs = VolumeManagerImpl.get(); - String hostname = Accumulo.getLocalAddress(args); + ServerOpts opts = new ServerOpts(); + opts.parseArgs("monitor", args); + String hostname = opts.getAddress(); instance = HdfsZooInstance.getInstance(); config = new ServerConfiguration(instance); Accumulo.init(fs, config, "monitor"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index 9824c64..7425fed 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -117,7 +117,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; import org.apache.accumulo.core.tabletserver.thrift.TabletStats; -import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.ByteBufferUtil; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.Daemon; @@ -136,6 +135,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; @@ -187,7 +187,7 @@ import org.apache.accumulo.server.util.MapCounter; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.util.MetadataTableUtil.LogEntry; import org.apache.accumulo.server.util.TServerUtils; -import org.apache.accumulo.server.util.TServerUtils.ServerPort; +import org.apache.accumulo.server.util.TServerUtils.ServerAddress; import org.apache.accumulo.server.util.time.RelativeTime; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; @@ -2611,11 +2611,11 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu MetadataTableUtil.addLogEntry(SystemCredentials.get().getAsThrift(), entry, getLock()); } - private int startServer(AccumuloConfiguration conf, Property portHint, TProcessor processor, String threadName) throws UnknownHostException { - ServerPort sp = TServerUtils.startServer(conf, portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH, + private int startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName) throws UnknownHostException { + ServerAddress sp = TServerUtils.startServer(conf, address, portHint, processor, this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); this.server = sp.server; - return sp.port; + return sp.address.getPort(); } private String getMasterAddress() { @@ -2655,7 +2655,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu // start listening for client connection last Iface tch = TraceWrap.service(new ThriftClientHandler()); Processor<Iface> processor = new Processor<Iface>(tch); - int port = startServer(getSystemConfiguration(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); + int port = startServer(getSystemConfiguration(), clientAddress.getHostName(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server"); log.info("port = " + port); return port; } @@ -2733,7 +2733,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu if (clientPort == 0) { throw new RuntimeException("Failed to start the tablet client service"); } - clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort); + clientAddress = new InetSocketAddress(clientAddress.getHostName(), clientPort); announceExistence(); ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue"); @@ -3012,7 +3012,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu public String getClientAddressString() { if (clientAddress == null) return null; - return AddressUtil.toString(clientAddress); + return clientAddress.getHostName() + ":" + clientAddress.getPort(); } TServerInstance getTabletSession() { @@ -3213,7 +3213,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu try { SecurityUtil.serverLogin(); VolumeManager fs = VolumeManagerImpl.get(); - String hostname = Accumulo.getLocalAddress(args); + ServerOpts opts = new ServerOpts(); + opts.parseArgs("tserver", args); + String hostname = opts.getAddress(); Instance instance = HdfsZooInstance.getInstance(); ServerConfiguration conf = new ServerConfiguration(instance); Accumulo.init(fs, conf, "tserver"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java index 67a55fa..5875182 100644 --- a/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java +++ b/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; import org.apache.accumulo.server.Accumulo; +import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.fs.VolumeManager; @@ -256,11 +257,13 @@ public class TraceServer implements Watcher { public static void main(String[] args) throws Exception { SecurityUtil.serverLogin(); + ServerOpts opts = new ServerOpts(); + opts.parseArgs("tracer", args); Instance instance = HdfsZooInstance.getInstance(); ServerConfiguration conf = new ServerConfiguration(instance); VolumeManager fs = VolumeManagerImpl.get(); Accumulo.init(fs, conf, "tracer"); - String hostname = Accumulo.getLocalAddress(args); + String hostname = opts.getAddress(); TraceServer server = new TraceServer(conf, hostname); Accumulo.enableTracing(hostname, "tserver"); server.run(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java index 0c751f5..926e370 100644 --- a/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java +++ b/server/src/main/java/org/apache/accumulo/server/util/TServerUtils.java @@ -55,13 +55,13 @@ public class TServerUtils { public static final ThreadLocal<String> clientAddress = new ThreadLocal<String>(); - public static class ServerPort { + public static class ServerAddress { public final TServer server; - public final int port; + public final InetSocketAddress address; - public ServerPort(TServer server, int port) { + public ServerAddress(TServer server, InetSocketAddress address) { this.server = server; - this.port = port; + this.address = address; } } @@ -83,7 +83,7 @@ public class TServerUtils { * @throws UnknownHostException * when we don't know our own address */ - public static ServerPort startServer(AccumuloConfiguration conf, Property portHintProperty, TProcessor processor, String serverName, String threadName, + public static ServerAddress startServer(AccumuloConfiguration conf, String address, Property portHintProperty, TProcessor processor, String serverName, String threadName, Property portSearchProperty, Property minThreadProperty, Property timeBetweenThreadChecksProperty, @@ -118,7 +118,8 @@ public class TServerUtils { if (port > 65535) port = 1024 + port % (65535 - 1024); try { - return TServerUtils.startTServer(port, timedProcessor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize); + InetSocketAddress addr = new InetSocketAddress(address, port); + return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize); } catch (Exception ex) { log.info("Unable to use port " + port + ", retrying. (Thread Name = " + threadName + ")"); UtilWaitThread.sleep(250); @@ -212,9 +213,9 @@ public class TServerUtils { } } - public static ServerPort startHsHaServer(int port, TProcessor processor, final String serverName, String threadName, final int numThreads, + public static ServerAddress startHsHaServer(InetSocketAddress address, TProcessor processor, final String serverName, String threadName, final int numThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { - TNonblockingServerSocket transport = new TNonblockingServerSocket(port); + TNonblockingServerSocket transport = new TNonblockingServerSocket(address); THsHaServer.Args options = new THsHaServer.Args(transport); options.protocolFactory(ThriftUtil.protocolFactory()); options.transportFactory(ThriftUtil.transportFactory(maxMessageSize)); @@ -249,10 +250,10 @@ public class TServerUtils { }, timeBetweenThreadChecks, timeBetweenThreadChecks); options.executorService(pool); options.processorFactory(new TProcessorFactory(processor)); - return new ServerPort(new THsHaServer(options), port); + return new ServerAddress(new THsHaServer(options), address); } - public static ServerPort startThreadPoolServer(int port, TProcessor processor, String serverName, String threadName, int numThreads) + public static ServerAddress startThreadPoolServer(InetSocketAddress address, TProcessor processor, String serverName, String threadName, int numThreads) throws TTransportException { // if port is zero, then we must bind to get the port number @@ -260,8 +261,8 @@ public class TServerUtils { try { sock = ServerSocketChannel.open().socket(); sock.setReuseAddress(true); - sock.bind(new InetSocketAddress(port)); - port = sock.getLocalPort(); + sock.bind(address); + address = new InetSocketAddress(address.getHostName(), sock.getLocalPort()); } catch (IOException ex) { throw new TTransportException(ex); } @@ -270,17 +271,17 @@ public class TServerUtils { options.protocolFactory(ThriftUtil.protocolFactory()); options.transportFactory(ThriftUtil.transportFactory()); options.processorFactory(new ClientInfoProcessorFactory(processor)); - return new ServerPort(new TThreadPoolServer(options), port); + return new ServerAddress(new TThreadPoolServer(options), address); } - public static ServerPort startTServer(int port, TProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize) + public static ServerAddress startTServer(InetSocketAddress address, TProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { - return startTServer(port, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize); + return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize); } - public static ServerPort startTServer(int port, TimedProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize) + public static ServerAddress startTServer(InetSocketAddress address, TimedProcessor processor, String serverName, String threadName, int numThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { - ServerPort result = startHsHaServer(port, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize); + ServerAddress result = startHsHaServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize); // ServerPort result = startThreadPoolServer(port, processor, serverName, threadName, -1); final TServer finalServer = result.server; Runnable serveTask = new Runnable() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java b/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java index 9ac0b50..28d727d 100644 --- a/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java +++ b/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.security.CredentialHelper; import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.gc.SimpleGarbageCollector.Opts; import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; @@ -98,7 +99,7 @@ public class TestConfirmDeletes { load(instance, metadata, deletes); - SimpleGarbageCollector gc = new SimpleGarbageCollector(); + SimpleGarbageCollector gc = new SimpleGarbageCollector(new Opts()); gc.init(fs, instance, auth, false); SortedSet<String> candidates = gc.getCandidates(); Assert.assertEquals(expectedInitial, candidates.size()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 2afca25..ab40304 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.test.functional; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Random; @@ -39,7 +38,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.util.TServerUtils; -import org.apache.accumulo.server.util.TServerUtils.ServerPort; +import org.apache.accumulo.server.util.TServerUtils.ServerAddress; import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; @@ -99,10 +98,9 @@ public class ZombieTServer { TransactionWatcher watcher = new TransactionWatcher(); final ThriftClientHandler tch = new ThriftClientHandler(instance, watcher); Processor<Iface> processor = new Processor<Iface>(tch); - ServerPort serverPort = TServerUtils.startTServer(port, processor, "ZombieTServer", "walking dead", 2, 1000, 10*1024*1024); + ServerAddress serverPort = TServerUtils.startTServer(new InetSocketAddress(port), processor, "ZombieTServer", "walking dead", 2, 1000, 10*1024*1024); - InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), serverPort.port); - String addressString = AddressUtil.toString(addr); + String addressString = AddressUtil.toString(serverPort.address); String zPath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + addressString; ZooReaderWriter zoo = ZooReaderWriter.getInstance(); zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 41a4d54..e33603f 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -220,7 +220,7 @@ public class NullTserver { TransactionWatcher watcher = new TransactionWatcher(); ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher); Processor<Iface> processor = new Processor<Iface>(tch); - TServerUtils.startTServer(opts.port, processor, "NullTServer", "null tserver", 2, 1000, 10 * 1024 * 1024); + TServerUtils.startTServer(new InetSocketAddress(opts.port), processor, "NullTServer", "null tserver", 2, 1000, 10 * 1024 * 1024); InetSocketAddress addr = new InetSocketAddress(InetAddress.getLocalHost(), opts.port); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index 37c7e43..b3d6479 100644 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@ -627,6 +627,7 @@ public class ShellServerIT { exec("insert \\x02 cf cq value", true); exec("scan -b 02", true, "value", false); exec("interpreter -i org.apache.accumulo.core.util.interpret.HexScanInterpreter", true); + UtilWaitThread.sleep(500); exec("interpreter -l", true, "HexScan", true); exec("scan -b 02", true, "value", true); exec("deletetable -f t", true); http://git-wip-us.apache.org/repos/asf/accumulo/blob/10b44e79/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java b/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java index 5fe60e2..19dfa6b 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/MacTest.java @@ -22,7 +22,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.minicluster.MiniAccumuloCluster; -import org.apache.accumulo.minicluster.MiniAccumuloCluster.LogWriter; import org.apache.accumulo.minicluster.MiniAccumuloConfig; import org.apache.log4j.Logger; import org.junit.After; @@ -58,8 +57,6 @@ public class MacTest { public void tearDown() throws Exception { if (cluster != null) cluster.stop(); - for (LogWriter log : cluster.getLogWriters()) - log.flush(); folder.delete(); }