Author: ecn
Date: Wed Apr 4 19:10:01 2012
New Revision: 1309550
URL: http://svn.apache.org/viewvc?rev=1309550&view=rev
Log:
ACCUMULO-14: first minor step towards this, build all the servers the same way,
passing FileSystem and Instance
Modified:
accumulo/trunk/bin/config.sh
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
Modified: accumulo/trunk/bin/config.sh
URL:
http://svn.apache.org/viewvc/accumulo/trunk/bin/config.sh?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
--- accumulo/trunk/bin/config.sh (original)
+++ accumulo/trunk/bin/config.sh Wed Apr 4 19:10:01 2012
@@ -52,7 +52,7 @@ fi
if [ -z "$HADOOP_HOME" ]
then
HADOOP_HOME="`which hadoop`"
- if [ -z "$HADOOP_HOME"]
+ if [ -z "$HADOOP_HOME" ]
then
echo "You must set HADOOP_HOME"
exit 1
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
Wed Apr 4 19:10:01 2012
@@ -24,16 +24,13 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.Version;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,33 +44,21 @@ import org.apache.zookeeper.KeeperExcept
public class Accumulo {
private static final Logger log = Logger.getLogger(Accumulo.class);
- private static Integer dataVersion = null;
- public static synchronized void updateAccumuloVersion() {
- Configuration conf = CachedConfiguration.getInstance();
+ public static synchronized void updateAccumuloVersion(FileSystem fs) {
try {
- if (getAccumuloPersistentVersion() == Constants.PREV_DATA_VERSION) {
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf,
ServerConfiguration.getSiteConfiguration()));
-
+ if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" +
Constants.DATA_VERSION));
fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" +
Constants.PREV_DATA_VERSION), false);
-
- dataVersion = null;
}
} catch (IOException e) {
throw new RuntimeException("Unable to set accumulo version: an error
occurred.", e);
}
-
}
- public static synchronized int getAccumuloPersistentVersion() {
- if (dataVersion != null)
- return dataVersion;
-
- Configuration conf = CachedConfiguration.getInstance();
+ public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
+ int dataVersion;
try {
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf,
ServerConfiguration.getSiteConfiguration()));
-
FileStatus[] files =
fs.listStatus(ServerConstants.getDataVersionLocation());
if (files == null || files.length == 0) {
dataVersion = -1; // assume it is 0.5 or earlier
@@ -84,7 +69,6 @@ public class Accumulo {
} catch (IOException e) {
throw new RuntimeException("Unable to read accumulo version: an error
occurred.", e);
}
-
}
public static void enableTracing(String address, String application) {
@@ -95,7 +79,7 @@ public class Accumulo {
}
}
- public static void init(String application) throws UnknownHostException {
+ public static void init(FileSystem fs, String application) throws
UnknownHostException {
System.setProperty("org.apache.accumulo.core.application", application);
@@ -126,10 +110,10 @@ public class Accumulo {
log.info(application + " starting");
log.info("Instance " + HdfsZooInstance.getInstance().getInstanceID());
- log.info("Data Version " + Accumulo.getAccumuloPersistentVersion());
- Accumulo.waitForZookeeperAndHdfs();
+ int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
+ log.info("Data Version " + dataVersion);
+ Accumulo.waitForZookeeperAndHdfs(fs);
- int dataVersion = Accumulo.getAccumuloPersistentVersion();
Version codeVersion = new Version(Constants.VERSION);
if (dataVersion != Constants.DATA_VERSION && dataVersion !=
Constants.PREV_DATA_VERSION) {
throw new RuntimeException("This version of accumulo (" + codeVersion +
") is not compatible with files stored using data version " + dataVersion);
@@ -147,7 +131,7 @@ public class Accumulo {
}
}
- public static InetAddress getLocalAddress(String[] args) throws
UnknownHostException {
+ public static String getLocalAddress(String[] args) throws
UnknownHostException {
InetAddress result = InetAddress.getLocalHost();
for (int i = 0; i < args.length - 1; i++) {
if (args[i].equals("-a") || args[i].equals("--address")) {
@@ -156,10 +140,10 @@ public class Accumulo {
break;
}
}
- return result;
+ return result.getHostName();
}
- public static void waitForZookeeperAndHdfs() {
+ public static void waitForZookeeperAndHdfs(FileSystem fs) {
log.info("Attempting to talk to zookeeper");
while (true) {
try {
@@ -176,7 +160,6 @@ public class Accumulo {
long sleep = 1000;
while (true) {
try {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
if (!(fs instanceof DistributedFileSystem))
break;
DistributedFileSystem dfs = (DistributedFileSystem)
FileSystem.get(CachedConfiguration.getInstance());
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
Wed Apr 4 19:10:01 2012
@@ -99,6 +99,19 @@ import org.apache.zookeeper.KeeperExcept
public class SimpleGarbageCollector implements Iface {
private static final Text EMPTY_TEXT = new Text();
+ static final Options OPTS = new Options();
+ static final Option OPT_VERBOSE_MODE = new Option("v", "verbose", false,
"extra information will get printed to stdout also");
+ static final Option OPT_SAFE_MODE = new Option("s", "safemode", false, "safe
mode will not delete files");
+ static final Option OPT_OFFLINE = new Option("o", "offline", false,
+ "offline mode will run once and check data files directly; this is
dangerous if accumulo is running or not shut down properly");
+ static final Option OPT_ADDRESS = new Option("a", "address", true, "specify
our local address");
+ static {
+ OPTS.addOption(OPT_VERBOSE_MODE);
+ OPTS.addOption(OPT_SAFE_MODE);
+ OPTS.addOption(OPT_OFFLINE);
+ OPTS.addOption(OPT_ADDRESS);
+ }
+
// how much of the JVM's available memory should it use gathering candidates
private static final float CANDIDATE_MEMORY_PERCENTAGE = 0.75f;
private boolean candidateMemExceeded;
@@ -110,10 +123,8 @@ public class SimpleGarbageCollector impl
private long gcStartDelay;
private boolean checkForBulkProcessingFiles;
private FileSystem fs;
- private Option optSafeMode, optOffline, optVerboseMode, optAddress;
- private boolean safemode, offline, verbose;
- private String address;
- private CommandLine commandLine;
+ private boolean safemode = false, offline = false, verbose = false;
+ private String address = "localhost";
private ZooLock lock;
private Key continueKey = null;
@@ -122,52 +133,56 @@ public class SimpleGarbageCollector impl
private int numDeleteThreads;
public static void main(String[] args) throws UnknownHostException,
IOException {
- Accumulo.init("gc");
- SimpleGarbageCollector gc = new SimpleGarbageCollector(args);
-
- FileSystem fs;
- try {
- fs =
TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration()));
- } catch (IOException e) {
- String str = "Can't get default file system";
- log.fatal(str, e);
- throw new IllegalStateException(str, e);
- }
- gc.init(fs, HdfsZooInstance.getInstance(),
SecurityConstants.getSystemCredentials(),
ServerConfiguration.getSystemConfiguration());
- Accumulo.enableTracing(gc.address, "gc");
- gc.run();
- }
-
- public SimpleGarbageCollector(String[] args) throws UnknownHostException {
- Options opts = new Options();
- optVerboseMode = new Option("v", "verbose", false, "extra information will
get printed to stdout also");
- optSafeMode = new Option("s", "safemode", false, "safe mode will not
delete files");
- optOffline = new Option("o", "offline", false,
- "offline mode will run once and check data files directly; this is
dangerous if accumulo is running or not shut down properly");
- optAddress = new Option("a", "address", true, "specify our local address");
- opts.addOption(optVerboseMode);
- opts.addOption(optSafeMode);
- opts.addOption(optOffline);
- opts.addOption(optAddress);
-
+ final FileSystem fs =
FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration());
+ Accumulo.init(fs, "gc");
+ String address = "localhost";
+ SimpleGarbageCollector gc = new SimpleGarbageCollector();
try {
- commandLine = new BasicParser().parse(opts, args);
+ final CommandLine commandLine = new BasicParser().parse(OPTS, args);
if (commandLine.getArgs().length != 0)
throw new ParseException("Extraneous arguments");
- safemode = commandLine.hasOption(optSafeMode.getOpt());
- offline = commandLine.hasOption(optOffline.getOpt());
- verbose = commandLine.hasOption(optVerboseMode.getOpt());
- address = commandLine.getOptionValue(optAddress.getOpt());
+ if (commandLine.hasOption(OPT_SAFE_MODE.getOpt()))
+ gc.setSafeMode();
+ if (commandLine.hasOption(OPT_OFFLINE.getOpt()))
+ gc.setOffline();
+ if (commandLine.hasOption(OPT_VERBOSE_MODE.getOpt()))
+ gc.setVerbose();
+ address = commandLine.getOptionValue(OPT_ADDRESS.getOpt());
+ if (address != null)
+ gc.useAddress(address);
} catch (ParseException e) {
String str = "Can't parse the command line options";
log.fatal(str, e);
throw new IllegalArgumentException(str, e);
}
+
+ gc.init(fs, HdfsZooInstance.getInstance(),
SecurityConstants.getSystemCredentials(),
ServerConfiguration.getSystemConfiguration());
+ Accumulo.enableTracing(address, "gc");
+ gc.run();
+ }
+
+ public SimpleGarbageCollector() {
+ }
+
+ public void setSafeMode() {
+ this.safemode = true;
}
+ public void setOffline() {
+ this.offline = true;
+ }
+
+ public void setVerbose() {
+ this.verbose = true;
+ }
+
+ public void useAddress(String address) {
+ this.address = address;
+ }
+
public void init(FileSystem fs, Instance instance, AuthInfo credentials,
AccumuloConfiguration conf) {
- this.fs = fs;
+ this.fs = TraceFileSystem.wrap(fs);
this.instance = instance;
this.credentials = credentials;
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogService.java
Wed Apr 4 19:10:01 2012
@@ -36,6 +36,7 @@ import org.apache.accumulo.cloudtrace.in
import org.apache.accumulo.cloudtrace.thrift.TInfo;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
@@ -67,7 +68,6 @@ import org.apache.accumulo.server.util.T
import org.apache.accumulo.server.util.TServerUtils.ServerPort;
import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
@@ -89,18 +89,19 @@ import org.apache.zookeeper.Watcher.Even
public class LogService implements MutationLogger.Iface, Watcher {
static final org.apache.log4j.Logger LOG =
org.apache.log4j.Logger.getLogger(LogService.class);
- private Configuration conf;
- private Authenticator authenticator;
- private TServer service;
- private LogWriter writer_;
- private MutationLogger.Iface writer;
+ private final Instance instance;
+ private final Authenticator authenticator;
+ private final TServer service;
+ private final LogWriter writer_;
+ private final MutationLogger.Iface writer;
+ private ShutdownState shutdownState = ShutdownState.STARTED;
+ private final List<FileLock> fileLocks = new ArrayList<FileLock>();
+ private final String addressString;
enum ShutdownState {
STARTED, REGISTERED, WAITING_FOR_HALT, HALT
};
- private ShutdownState shutdownState = ShutdownState.STARTED;
-
synchronized void switchState(ShutdownState state) {
LOG.info("Switching from " + shutdownState + " to " + state);
shutdownState = state;
@@ -111,19 +112,19 @@ public class LogService implements Mutat
throw new LoggerClosedException();
}
- private List<FileLock> fileLocks = new ArrayList<FileLock>();
-
- private final String addressString;
-
public static void main(String[] args) throws Exception {
LogService logService;
+ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration());
+ Accumulo.init(fs, "logger");
+ String hostname = Accumulo.getLocalAddress(args);
try {
- logService = new LogService(args);
+ logService = new LogService(HdfsZooInstance.getInstance(), fs, hostname);
} catch (Exception e) {
LOG.fatal("Failed to initialize log service args=" +
Arrays.asList(args), e);
throw e;
}
+ Accumulo.enableTracing(hostname, "logger");
try {
logService.run();
} catch (Exception ex) {
@@ -131,24 +132,11 @@ public class LogService implements Mutat
}
}
- public LogService(String[] args) throws UnknownHostException,
KeeperException, InterruptedException, IOException {
- try {
- Accumulo.init("logger");
- } catch (UnknownHostException e1) {
- LOG.error("Error reading logging configuration");
- }
-
+ public LogService(Instance instance, FileSystem fs, String hostname) throws
UnknownHostException, KeeperException, InterruptedException, IOException {
+ this.instance = instance;
FileSystemMonitor.start(Property.LOGGER_MONITOR_FS);
- conf = CachedConfiguration.getInstance();
- FileSystem fs = null;
- try {
- fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf,
ServerConfiguration.getSiteConfiguration()));
- } catch (IOException e) {
- String msg = "Exception connecting to FileSystem";
- LOG.error(msg, e);
- throw new RuntimeException(msg);
- }
+ fs = TraceFileSystem.wrap(fs);
final Set<String> rootDirs = new HashSet<String>();
for (String root :
ServerConfiguration.getSystemConfiguration().get(Property.LOGGER_DIR).split(","))
{
if (!root.startsWith("/"))
@@ -188,7 +176,7 @@ public class LogService implements Mutat
int poolSize =
ServerConfiguration.getSystemConfiguration().getCount(Property.LOGGER_COPY_THREADPOOL_SIZE);
boolean archive =
ServerConfiguration.getSystemConfiguration().getBoolean(Property.LOGGER_ARCHIVE);
AccumuloConfiguration acuConf =
ServerConfiguration.getSystemConfiguration();
- writer_ = new LogWriter(acuConf, fs, rootDirs,
HdfsZooInstance.getInstance().getInstanceID(), poolSize, archive);
+ writer_ = new LogWriter(acuConf, fs, rootDirs, instance.getInstanceID(),
poolSize, archive);
InvocationHandler h = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
@@ -225,11 +213,10 @@ public class LogService implements Mutat
ServerPort sp = TServerUtils.startServer(Property.LOGGER_PORT, processor,
this.getClass().getSimpleName(), "Logger Client Service Handler",
Property.LOGGER_PORTSEARCH, Property.LOGGER_MINTHREADS,
Property.LOGGER_THREADCHECK);
service = sp.server;
- InetSocketAddress address = new
InetSocketAddress(Accumulo.getLocalAddress(args), sp.port);
+ InetSocketAddress address = new InetSocketAddress(hostname, sp.port);
addressString = AddressUtil.toString(address);
registerInZooKeeper(Constants.ZLOGGERS);
this.switchState(ShutdownState.REGISTERED);
- Accumulo.enableTracing(address.getHostName(), "logger");
}
public void run() {
@@ -252,7 +239,7 @@ public class LogService implements Mutat
void registerInZooKeeper(String zooDir) {
try {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- String path = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + zooDir;
+ String path = ZooUtil.getRoot(instance) + zooDir;
path += "/logger-";
path = zoo.putEphemeralSequential(path, addressString.getBytes());
zoo.exists(path, this);
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
Wed Apr 4 19:10:01 2012
@@ -198,9 +198,9 @@ public class Master implements LiveTServ
final private static int MAX_TSERVER_WORK_CHUNK = 5000;
final private static int MAX_BAD_STATUS_COUNT = 3;
+ final private FileSystem fs;
final private Instance instance;
final private String hostname;
- final private FileSystem fs;
final private LiveTServerSet tserverSet;
final private List<TabletGroupWatcher> watchers = new
ArrayList<TabletGroupWatcher>();
final private Authenticator authenticator;
@@ -279,7 +279,7 @@ public class Master implements LiveTServ
private void upgradeZookeeper() {
- if (Accumulo.getAccumuloPersistentVersion() ==
Constants.PREV_DATA_VERSION) {
+ if (Accumulo.getAccumuloPersistentVersion(fs) ==
Constants.PREV_DATA_VERSION) {
try {
log.info("Upgrading zookeeper");
@@ -332,7 +332,7 @@ public class Master implements LiveTServ
private AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
private void upgradeMetadata() {
- if (Accumulo.getAccumuloPersistentVersion() ==
Constants.PREV_DATA_VERSION) {
+ if (Accumulo.getAccumuloPersistentVersion(fs) ==
Constants.PREV_DATA_VERSION) {
if (upgradeMetadataRunning.compareAndSet(false, true)) {
Runnable upgradeTask = new Runnable() {
@Override
@@ -354,7 +354,7 @@ public class Master implements LiveTServ
bw.close();
- Accumulo.updateAccumuloVersion();
+ Accumulo.updateAccumuloVersion(fs);
log.info("Upgrade complete");
@@ -528,26 +528,18 @@ public class Master implements LiveTServ
return instance;
}
- public Master(String[] args) throws IOException {
-
- Accumulo.init("master");
-
+ public Master(Instance instance, FileSystem fs, String hostname) throws
IOException {
+ this.instance = instance;
+ this.fs = TraceFileSystem.wrap(fs);
+ this.hostname = hostname;
+
log.info("Version " + Constants.VERSION);
- instance = HdfsZooInstance.getInstance();
log.info("Instance " + instance.getInstanceID());
-
ThriftTransportPool.getInstance().setIdleTime(ServerConfiguration.getSiteConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
-
- hostname = Accumulo.getLocalAddress(args).getHostName();
- fs =
TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration()));
- ;
authenticator = ZKAuthenticator.getInstance();
-
tserverSet = new LiveTServerSet(instance, this);
-
this.tabletBalancer =
createInstanceFromPropertyName(Property.MASTER_TABLET_BALANCER,
TabletBalancer.class, new DefaultLoadBalancer());
this.loggerBalancer =
createInstanceFromPropertyName(Property.MASTER_LOGGER_BALANCER,
LoggerBalancer.class, new SimpleLoggerBalancer());
- Accumulo.enableTracing(hostname, "master");
}
public TServerConnection getConnection(TServerInstance server) {
@@ -2151,7 +2143,11 @@ public class Master implements LiveTServ
public static void main(String[] args) throws Exception {
try {
- Master master = new Master(args);
+ FileSystem fs =
FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration());
+ Accumulo.init(fs, "master");
+ String hostname = Accumulo.getLocalAddress(args);
+ Master master = new Master(HdfsZooInstance.getInstance(), fs, hostname);
+ Accumulo.enableTracing(hostname, "master");
master.run();
} catch (Exception ex) {
log.error("Unexpected exception, exiting", ex);
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
Wed Apr 4 19:10:01 2012
@@ -17,12 +17,16 @@
package org.apache.accumulo.server.master.state;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileSystem;
public class SetGoalState {
@@ -34,7 +38,8 @@ public class SetGoalState {
System.err.println("Usage: accumulo " + SetGoalState.class.getName() + "
[NORMAL|SAFE_MODE|CLEAN_STOP]");
System.exit(-1);
}
- Accumulo.waitForZookeeperAndHdfs();
+ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration());
+ Accumulo.waitForZookeeperAndHdfs(fs);
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance())
+ Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
NodeExistsPolicy.OVERWRITE);
}
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
Wed Apr 4 19:10:01 2012
@@ -17,7 +17,6 @@
package org.apache.accumulo.server.monitor;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -32,6 +31,7 @@ import org.apache.accumulo.core.Constant
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.gc.thrift.GCMonitorService;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.master.thrift.Compacting;
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.Pair;
@@ -68,6 +69,7 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.EmbeddedWebServer;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -137,6 +139,8 @@ public class Monitor {
private static Exception problemException;
private static GCStatus gcStatus;
+ private static Instance instance;
+
public static Map<String,Double> summarizeTableStats(MasterMonitorInfo mmi) {
Map<String,Double> compactingByTable = new HashMap<String,Double>();
if (mmi != null && mmi.tServerInfo != null) {
@@ -421,13 +425,19 @@ public class Monitor {
return result;
}
- public static void main(String[] args) {
- new Monitor().run(args);
+ public static void main(String[] args) throws Exception {
+ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration());
+ Accumulo.init(fs, "monitor");
+ String hostname = Accumulo.getLocalAddress(args);
+ instance = HdfsZooInstance.getInstance();
+ Monitor monitor = new Monitor();
+ Accumulo.enableTracing(hostname, "monitor");
+ monitor.run(hostname);
}
private static long START_TIME;
- public void run(String[] args) {
+ public void run(String hostname) {
Monitor.START_TIME = System.currentTimeMillis();
int port =
ServerConfiguration.getSystemConfiguration().getPort(Property.MONITOR_PORT);
EmbeddedWebServer server;
@@ -475,14 +485,6 @@ public class Monitor {
}
}), "Data fetcher").start();
-
- try {
- Accumulo.init("monitor");
- Accumulo.enableTracing(Accumulo.getLocalAddress(args).toString(),
"monitor");
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
-
}
public static MasterMonitorInfo getMmi() {
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
Wed Apr 4 19:10:01 2012
@@ -41,8 +41,8 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.InterruptibleIterator;
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator;
-import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
import
org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
+import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.problems.ProblemReport;
@@ -50,7 +50,6 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -103,7 +102,6 @@ public class FileManager {
private Semaphore filePermits;
private FileSystem fs;
- private Configuration conf;
// the data cache and index cache are allocated in
// TabletResourceManager and passed through the file opener to
@@ -163,7 +161,7 @@ public class FileManager {
* @param indexCache
* : underlying file can and should be able to handle a null cache
*/
- FileManager(Configuration conf, FileSystem fs, int maxOpen, BlockCache
dataCache, BlockCache indexCache) {
+ FileManager(FileSystem fs, int maxOpen, BlockCache dataCache, BlockCache
indexCache) {
if (maxOpen <= 0)
throw new IllegalArgumentException("maxOpen <= 0");
@@ -174,7 +172,6 @@ public class FileManager {
this.filePermits = new Semaphore(maxOpen, true);
this.maxOpen = maxOpen;
this.fs = fs;
- this.conf = conf;
this.openFiles = new HashMap<String,List<OpenReader>>();
this.reservedReaders = new HashMap<FileSKVIterator,String>();
@@ -311,7 +308,8 @@ public class FileManager {
for (String file : filesToOpen) {
try {
// log.debug("Opening "+file);
- FileSKVIterator reader = FileOperations.getInstance().openReader(file,
false, fs, conf, ServerConfiguration.getTableConfiguration(table.toString()),
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file,
false, fs, fs.getConf(),
+ ServerConfiguration.getTableConfiguration(table.toString()),
dataCache, indexCache);
reservedFiles.add(reader);
readersReserved.put(reader, file);
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Wed Apr 4 19:10:01 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
@@ -69,6 +68,7 @@ import org.apache.accumulo.cloudtrace.th
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.TabletType;
import org.apache.accumulo.core.client.impl.Translator;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -188,7 +188,6 @@ import org.apache.accumulo.server.zookee
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.Platform;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -221,8 +220,10 @@ public class TabletServer extends Abstra
protected TabletServerMinCMetrics mincMetrics = new
TabletServerMinCMetrics();
- public TabletServer() {
+ public TabletServer(Instance instance, FileSystem fs) {
super();
+ this.instance = instance;
+ this.fs = TraceFileSystem.wrap(fs);
SimpleTimer.getInstance().schedule(new TimerTask() {
@Override
public void run() {
@@ -1774,7 +1775,7 @@ public class TabletServer extends Abstra
});
}
- ZooUtil.LockID lid = new
ZooUtil.LockID(ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
Constants.ZMASTER_LOCK, lock);
+ ZooUtil.LockID lid = new ZooUtil.LockID(ZooUtil.getRoot(instance) +
Constants.ZMASTER_LOCK, lock);
try {
if (!ZooLock.isLockHeld(masterLockCache, lid)) {
@@ -2479,7 +2480,7 @@ public class TabletServer extends Abstra
}
private FileSystem fs;
- private Configuration conf;
+ private Instance instance;
private ZooCache cache;
private SortedMap<KeyExtent,Tablet> onlineTablets =
Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
@@ -2513,7 +2514,7 @@ public class TabletServer extends Abstra
public Set<String> getLoggers() throws TException,
MasterNotRunningException, ThriftSecurityException {
Set<String> allLoggers = new HashSet<String>();
- String dir = ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
Constants.ZLOGGERS;
+ String dir = ZooUtil.getRoot(instance) + Constants.ZLOGGERS;
for (String child : cache.getChildren(dir)) {
allLoggers.add(new String(cache.get(dir + "/" + child)));
}
@@ -2562,7 +2563,7 @@ public class TabletServer extends Abstra
private String getMasterAddress() {
try {
- List<String> locations =
HdfsZooInstance.getInstance().getMasterLocations();
+ List<String> locations = instance.getMasterLocations();
if (locations.size() == 0)
return null;
return locations.get(0);
@@ -2609,7 +2610,7 @@ public class TabletServer extends Abstra
private void announceExistence() {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
try {
- String zPath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
Constants.ZTSERVERS + "/" + getClientAddressString();
+ String zPath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" +
getClientAddressString();
zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
@@ -2929,26 +2930,10 @@ public class TabletServer extends Abstra
}
}
- public void config(String[] args) throws UnknownHostException {
- InetAddress local = Accumulo.getLocalAddress(args);
-
- try {
- Accumulo.init("tserver");
- log.info("Tablet server starting on " + local.getHostAddress());
-
- conf = CachedConfiguration.getInstance();
- fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf,
ServerConfiguration.getSiteConfiguration()));
-
- authenticator = ZKAuthenticator.getInstance();
-
- if (args.length > 0)
- conf.set("tabletserver.hostname", args[0]);
- Accumulo.enableTracing(local.getHostName(), "tserver");
- } catch (IOException e) {
- log.fatal("couldn't get a reference to the filesystem. quitting");
- throw new RuntimeException(e);
- }
- clientAddress = new InetSocketAddress(local, 0);
+ public void config(String hostname) {
+ log.info("Tablet server starting on " + hostname);
+ authenticator = ZKAuthenticator.getInstance();
+ clientAddress = new InetSocketAddress(hostname, 0);
logger = new TabletServerLogger(this,
ServerConfiguration.getSystemConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
if
(ServerConfiguration.getSystemConfiguration().getBoolean(Property.TSERV_LOCK_MEMORY))
{
@@ -2977,7 +2962,7 @@ public class TabletServer extends Abstra
SimpleTimer.getInstance().schedule(gcDebugTask, 0, 1000);
- this.resourceManager = new TabletServerResourceManager(conf, fs);
+ this.resourceManager = new TabletServerResourceManager(fs);
lastPingTime = System.currentTimeMillis();
@@ -3102,8 +3087,13 @@ public class TabletServer extends Abstra
public static void main(String[] args) throws IOException {
try {
- TabletServer server = new TabletServer();
- server.config(args);
+ FileSystem fs =
FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration());
+ Accumulo.init(fs, "tserver");
+ String hostname = Accumulo.getLocalAddress(args);
+ Instance instance = HdfsZooInstance.getInstance();
+ TabletServer server = new TabletServer(instance, fs);
+ server.config(hostname);
+ Accumulo.enableTracing(hostname, "tserver");
server.run();
} catch (Exception ex) {
log.error("Uncaught exception in TabletServer.main, exiting", ex);
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
Wed Apr 4 19:10:01 2012
@@ -43,14 +43,13 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
import org.apache.accumulo.server.util.NamingThreadFactory;
import org.apache.accumulo.start.classloader.AccumuloClassLoader;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
@@ -115,7 +114,7 @@ public class TabletServerResourceManager
return addEs(name, new ThreadPoolExecutor(min, max, timeout,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new
NamingThreadFactory(name)));
}
- public TabletServerResourceManager(Configuration conf, FileSystem fs) {
+ public TabletServerResourceManager(FileSystem fs) {
this.acuConf = ServerConfiguration.getSystemConfiguration();
@@ -171,7 +170,7 @@ public class TabletServerResourceManager
int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
- fileManager = new FileManager(conf, fs, maxOpenFiles, _dCache, _iCache);
+ fileManager = new FileManager(fs, maxOpenFiles, _dCache, _iCache);
try {
Class<? extends MemoryManager> clazz =
AccumuloClassLoader.loadClass(ServerConfiguration.getSystemConfiguration().get(Property.TSERV_MEM_MGMT),
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
Wed Apr 4 19:10:01 2012
@@ -27,13 +27,16 @@ import org.apache.accumulo.cloudtrace.th
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.trace.TraceFormatter;
import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.Accumulo;
@@ -42,6 +45,7 @@ import org.apache.accumulo.server.conf.S
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TByteArrayOutputStream;
@@ -63,6 +67,7 @@ public class TraceServer implements Watc
final private static Logger log = Logger.getLogger(TraceServer.class);
final private AccumuloConfiguration conf;
+ final private Instance instance;
final private TServer server;
private BatchWriter writer = null;
private Connector connector;
@@ -143,13 +148,13 @@ public class TraceServer implements Watc
}
- public TraceServer(String args[]) throws Exception {
- Accumulo.init("tracer");
+ public TraceServer(Instance instance, String hostname) throws Exception {
+ this.instance = instance;
conf = ServerConfiguration.getSystemConfiguration();
table = conf.get(Property.TRACE_TABLE);
while (true) {
try {
- connector =
HdfsZooInstance.getInstance().getConnector(conf.get(Property.TRACE_USER),
conf.get(Property.TRACE_PASSWORD).getBytes());
+ connector = instance.getConnector(conf.get(Property.TRACE_USER),
conf.get(Property.TRACE_PASSWORD).getBytes());
if (!connector.tableOperations().exists(table)) {
connector.tableOperations().create(table);
}
@@ -169,7 +174,7 @@ public class TraceServer implements Watc
TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
options.processor(new SpanReceiver.Processor(new Receiver()));
server = new TThreadPoolServer(options);
- final InetSocketAddress address = new
InetSocketAddress(Accumulo.getLocalAddress(args), sock.getLocalPort());
+ final InetSocketAddress address = new InetSocketAddress(hostname,
sock.getLocalPort());
registerInZooKeeper(AddressUtil.toString(address));
writer = connector.createBatchWriter(table, 100l * 1024 * 1024, 5 * 1000l,
10);
@@ -212,14 +217,19 @@ public class TraceServer implements Watc
private void registerInZooKeeper(String name) throws Exception {
- String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) +
Constants.ZTRACERS;
+ String root = ZooUtil.getRoot(instance) + Constants.ZTRACERS;
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String path = zoo.putEphemeralSequential(root + "/trace-",
name.getBytes());
zoo.exists(path, this);
}
public static void main(String[] args) throws Exception {
- TraceServer server = new TraceServer(args);
+ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(),
ServerConfiguration.getSiteConfiguration());
+ Accumulo.init(fs, "tracer");
+ String hostname = Accumulo.getLocalAddress(args);
+ Instance instance = HdfsZooInstance.getInstance();
+ TraceServer server = new TraceServer(instance, hostname);
+ Accumulo.enableTracing(hostname, "tserver");
server.run();
log.info("tracer stopping");
}
Modified:
accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java?rev=1309550&r1=1309549&r2=1309550&view=diff
==============================================================================
---
accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
(original)
+++
accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
Wed Apr 4 19:10:01 2012
@@ -100,7 +100,7 @@ public class TestConfirmDeletes {
load(instance, metadata, deletes);
- SimpleGarbageCollector gc = new SimpleGarbageCollector(new String[] {});
+ SimpleGarbageCollector gc = new SimpleGarbageCollector();
gc.init(fs, instance, auth, aconf);
SortedSet<String> candidates = gc.getCandidates();
Assert.assertEquals(expectedInitial, candidates.size());