Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
minicluster/src/main/java/org/apache/accumulo/minicluster/MiniAccumuloCluster.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9e32bf8b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9e32bf8b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9e32bf8b
Branch: refs/heads/master
Commit: 9e32bf8b11bd73d49faee31eddf5bc395f20e611
Parents: 722d6be 09ce814
Author: Josh Elser <[email protected]>
Authored: Mon Feb 3 15:48:06 2014 -0500
Committer: Josh Elser <[email protected]>
Committed: Mon Feb 3 15:48:06 2014 -0500
----------------------------------------------------------------------
.../minicluster/impl/MiniAccumuloClusterImpl.java | 1 +
.../minicluster/impl/MiniAccumuloConfigImpl.java | 16 ++++++++--------
2 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e32bf8b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
----------------------------------------------------------------------
diff --cc
minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
index fa305e5,0000000..c1b3b8b
mode 100644,000000..100644
---
a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java
@@@ -1,642 -1,0 +1,643 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.minicluster.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.master.state.SetGoalState;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.util.PortUtils;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.start.Main;
+import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.commons.configuration.MapConfiguration;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.impl.VFSClassLoader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+
+/**
+ * A utility class that will create Zookeeper and Accumulo processes that
write all of their data to a single local directory. This class makes it easy
to test
+ * code against a real Accumulo instance. Its much more accurate for testing
than {@link org.apache.accumulo.core.client.mock.MockAccumulo}, but much slower.
+ *
+ * @since 1.5.0
+ */
+public class MiniAccumuloClusterImpl {
+
+ public static class LogWriter extends Daemon {
+ private BufferedReader in;
+ private BufferedWriter out;
+
+ public LogWriter(InputStream stream, File logFile) throws IOException {
+ this.in = new BufferedReader(new InputStreamReader(stream));
+ out = new BufferedWriter(new FileWriter(logFile));
+
+ SimpleTimer.getInstance().schedule(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }, 1000, 1000);
+ }
+
+ public synchronized void flush() throws IOException {
+ if (out != null)
+ out.flush();
+ }
+
+ @Override
+ public void run() {
+ String line;
+
+ try {
+ while ((line = in.readLine()) != null) {
+ out.append(line);
+ out.append("\n");
+ }
+
+ synchronized (this) {
+ out.close();
+ out = null;
+ in.close();
+ }
+
+ } catch (IOException e) {}
+ }
+ }
+
+ private boolean initialized = false;
+ private Process zooKeeperProcess = null;
+ private Process masterProcess = null;
+ private Process gcProcess = null;
+ private List<Process> tabletServerProcesses =
Collections.synchronizedList(new ArrayList<Process>());
+
+ private Set<Pair<ServerType,Integer>> debugPorts = new
HashSet<Pair<ServerType,Integer>>();
+
+ private File zooCfgFile;
+ private String dfsUri;
+
+ public List<LogWriter> getLogWriters() {
+ return logWriters;
+ }
+
+ private List<LogWriter> logWriters = new
ArrayList<MiniAccumuloClusterImpl.LogWriter>();
+
+ private MiniAccumuloConfigImpl config;
+ private MiniDFSCluster miniDFS = null;
+ private List<Process> cleanup = new ArrayList<Process>();
+
+ public Process exec(Class<?> clazz, String... args) throws IOException {
+ return exec(clazz, null, args);
+ }
+
+ public Process exec(Class<?> clazz, List<String> jvmArgs, String... args)
throws IOException {
+ ArrayList<String> jvmArgs2 = new ArrayList<String>(1 + (jvmArgs == null ?
0 : jvmArgs.size()));
+ jvmArgs2.add("-Xmx" + config.getDefaultMemory());
+ if (jvmArgs != null)
+ jvmArgs2.addAll(jvmArgs);
+ Process proc = _exec(clazz, jvmArgs2, args);
+ cleanup.add(proc);
+ return proc;
+ }
+
+ private boolean containsSiteFile(File f) {
+ return f.isDirectory() && f.listFiles(new FileFilter() {
+
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().endsWith("site.xml");
+ }
+ }).length > 0;
+ }
+
+ private void append(StringBuilder classpathBuilder, URL url) throws
URISyntaxException {
+ File file = new File(url.toURI());
+ // do not include dirs containing hadoop or accumulo site files
+ if (!containsSiteFile(file))
+
classpathBuilder.append(File.pathSeparator).append(file.getAbsolutePath());
+ }
+
+ private String getClasspath() throws IOException {
+
+ try {
+ ArrayList<ClassLoader> classloaders = new ArrayList<ClassLoader>();
+
+ ClassLoader cl = this.getClass().getClassLoader();
+
+ while (cl != null) {
+ classloaders.add(cl);
+ cl = cl.getParent();
+ }
+
+ Collections.reverse(classloaders);
+
+ StringBuilder classpathBuilder = new StringBuilder();
+ classpathBuilder.append(config.getConfDir().getAbsolutePath());
+
+ if (config.getClasspathItems() == null) {
+
+ // assume 0 is the system classloader and skip it
+ for (int i = 1; i < classloaders.size(); i++) {
+ ClassLoader classLoader = classloaders.get(i);
+
+ if (classLoader instanceof URLClassLoader) {
+
+ URLClassLoader ucl = (URLClassLoader) classLoader;
+
+ for (URL u : ucl.getURLs()) {
+ append(classpathBuilder, u);
+ }
+
+ } else if (classLoader instanceof VFSClassLoader) {
+
+ VFSClassLoader vcl = (VFSClassLoader) classLoader;
+ for (FileObject f : vcl.getFileObjects()) {
+ append(classpathBuilder, f.getURL());
+ }
+ } else {
+ throw new IllegalArgumentException("Unknown classloader type : "
+ classLoader.getClass().getName());
+ }
+ }
+ } else {
+ for (String s : config.getClasspathItems())
+ classpathBuilder.append(File.pathSeparator).append(s);
+ }
+
+ return classpathBuilder.toString();
+
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private Process _exec(Class<?> clazz, List<String> extraJvmOpts, String...
args) throws IOException {
+ String javaHome = System.getProperty("java.home");
+ String javaBin = javaHome + File.separator + "bin" + File.separator +
"java";
+ String classpath = getClasspath();
+
+ String className = clazz.getName();
+
+ ArrayList<String> argList = new ArrayList<String>();
+ argList.addAll(Arrays.asList(javaBin, "-Dproc=" + clazz.getSimpleName(),
"-cp", classpath));
+ argList.addAll(extraJvmOpts);
+ for (Entry<String,String> sysProp :
config.getSystemProperties().entrySet()) {
+ argList.add(String.format("-D%s=%s", sysProp.getKey(),
sysProp.getValue()));
+ }
+ argList.addAll(Arrays.asList("-XX:+UseConcMarkSweepGC",
"-XX:CMSInitiatingOccupancyFraction=75", "-Dapple.awt.UIElement=true",
Main.class.getName(), className));
+ argList.addAll(Arrays.asList(args));
+
+ ProcessBuilder builder = new ProcessBuilder(argList);
+
+ builder.environment().put("ACCUMULO_HOME",
config.getDir().getAbsolutePath());
+ builder.environment().put("ACCUMULO_LOG_DIR",
config.getLogDir().getAbsolutePath());
+ builder.environment().put("ACCUMULO_CLIENT_CONF_PATH",
config.getClientConfFile().getAbsolutePath());
+ String ldLibraryPath =
StringUtil.join(Arrays.asList(config.getNativeLibPaths()), File.pathSeparator);
+ builder.environment().put("LD_LIBRARY_PATH", ldLibraryPath);
+ builder.environment().put("DYLD_LIBRARY_PATH", ldLibraryPath);
+
+ // if we're running under accumulo.start, we forward these env vars
+ String env = System.getenv("HADOOP_PREFIX");
+ if (env != null)
+ builder.environment().put("HADOOP_PREFIX", env);
+ env = System.getenv("ZOOKEEPER_HOME");
+ if (env != null)
+ builder.environment().put("ZOOKEEPER_HOME", env);
+ builder.environment().put("ACCUMULO_CONF_DIR",
config.getConfDir().getAbsolutePath());
+ // hadoop-2.2 puts error messages in the logs if this is not set
+ builder.environment().put("HADOOP_HOME",
config.getDir().getAbsolutePath());
+
+ Process process = builder.start();
+
+ LogWriter lw;
+ lw = new LogWriter(process.getErrorStream(), new File(config.getLogDir(),
clazz.getSimpleName() + "_" + process.hashCode() + ".err"));
+ logWriters.add(lw);
+ lw.start();
+ lw = new LogWriter(process.getInputStream(), new File(config.getLogDir(),
clazz.getSimpleName() + "_" + process.hashCode() + ".out"));
+ logWriters.add(lw);
+ lw.start();
+
+ return process;
+ }
+
+ private Process _exec(Class<?> clazz, ServerType serverType, String...
args) throws IOException {
+
+ List<String> jvmOpts = new ArrayList<String>();
+ jvmOpts.add("-Xmx" + config.getMemory(serverType));
+
+ if (config.isJDWPEnabled()) {
+ Integer port = PortUtils.getRandomFreePort();
+ jvmOpts.addAll(buildRemoteDebugParams(port));
+ debugPorts.add(new Pair<ServerType,Integer>(serverType, port));
+ }
+ return _exec(clazz, jvmOpts, args);
+ }
+
+ /**
+ *
+ * @param dir
+ * An empty or nonexistant temp directoy that Accumulo and
Zookeeper can store data in. Creating the directory is left to the user. Java
7, Guava,
+ * and Junit provide methods for creating temporary directories.
+ * @param rootPassword
+ * Initial root password for instance.
+ */
+ public MiniAccumuloClusterImpl(File dir, String rootPassword) throws
IOException {
+ this(new MiniAccumuloConfigImpl(dir, rootPassword));
+ }
+
+ /**
+ * @param config
+ * initial configuration
+ */
+ @SuppressWarnings("deprecation")
+ public MiniAccumuloClusterImpl(MiniAccumuloConfigImpl config) throws
IOException {
+
+ this.config = config.initialize();
+
+ config.getConfDir().mkdirs();
+ config.getAccumuloDir().mkdirs();
+ config.getZooKeeperDir().mkdirs();
+ config.getLogDir().mkdirs();
+ config.getWalogDir().mkdirs();
+ config.getLibDir().mkdirs();
++ config.getLibExtDir().mkdirs();
+
+ if (config.useMiniDFS()) {
+ File nn = new File(config.getAccumuloDir(), "nn");
+ nn.mkdirs();
+ File dn = new File(config.getAccumuloDir(), "dn");
+ dn.mkdirs();
+ File dfs = new File(config.getAccumuloDir(), "dfs");
+ dfs.mkdirs();
+ Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nn.getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dn.getAbsolutePath());
+ conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "1");
+ conf.set("dfs.support.append", "true");
+ conf.set("dfs.datanode.synconclose", "true");
+ conf.set("dfs.datanode.data.dir.perm",
MiniDFSUtil.computeDatanodeDirectoryPermission());
+ String oldTestBuildData = System.setProperty("test.build.data",
dfs.getAbsolutePath());
+ miniDFS = new MiniDFSCluster(conf, 1, true, null);
+ if (oldTestBuildData == null)
+ System.clearProperty("test.build.data");
+ else
+ System.setProperty("test.build.data", oldTestBuildData);
+ miniDFS.waitClusterUp();
+ InetSocketAddress dfsAddress =
miniDFS.getNameNode().getNameNodeAddress();
+ dfsUri = "hdfs://" + dfsAddress.getHostName() + ":" +
dfsAddress.getPort();
+ File coreFile = new File(config.getConfDir(), "core-site.xml");
+ writeConfig(coreFile, Collections.singletonMap("fs.default.name",
dfsUri).entrySet());
+ File hdfsFile = new File(config.getConfDir(), "hdfs-site.xml");
+ writeConfig(hdfsFile, conf);
+
+ Map<String,String> siteConfig = config.getSiteConfig();
+ siteConfig.put(Property.INSTANCE_DFS_URI.getKey(), dfsUri);
+ siteConfig.put(Property.INSTANCE_DFS_DIR.getKey(), "/accumulo");
+ config.setSiteConfig(siteConfig);
+ } else {
+ dfsUri = "file://";
+ }
+
+ File clientConfFile = config.getClientConfFile();
+ // Write only the properties that correspond to ClientConfiguration
properties
+ writeConfigProperties(clientConfFile,
Maps.filterEntries(config.getSiteConfig(), new
Predicate<Entry<String,String>>() {
+ @Override
+ public boolean apply(Entry<String,String> v) {
+ return
ClientConfiguration.ClientProperty.getPropertyByKey(v.getKey()) != null;
+ }
+ }));
+
+ File siteFile = new File(config.getConfDir(), "accumulo-site.xml");
+ writeConfig(siteFile, config.getSiteConfig().entrySet());
+
+ zooCfgFile = new File(config.getConfDir(), "zoo.cfg");
+ FileWriter fileWriter = new FileWriter(zooCfgFile);
+
+ // zookeeper uses Properties to read its config, so use that to write in
order to properly escape things like Windows paths
+ Properties zooCfg = new Properties();
+ zooCfg.setProperty("tickTime", "2000");
+ zooCfg.setProperty("initLimit", "10");
+ zooCfg.setProperty("syncLimit", "5");
+ zooCfg.setProperty("clientPort", config.getZooKeeperPort() + "");
+ zooCfg.setProperty("maxClientCnxns", "1000");
+ zooCfg.setProperty("dataDir", config.getZooKeeperDir().getAbsolutePath());
+ zooCfg.store(fileWriter, null);
+
+ fileWriter.close();
+ }
+
+ private void writeConfig(File file, Iterable<Map.Entry<String,String>>
settings) throws IOException {
+ FileWriter fileWriter = new FileWriter(file);
+ fileWriter.append("<configuration>\n");
+
+ for (Entry<String,String> entry : settings) {
+ String value = entry.getValue().replace("&", "&").replace("<",
"<").replace(">", ">");
+ fileWriter.append("<property><name>" + entry.getKey() +
"</name><value>" + value + "</value></property>\n");
+ }
+ fileWriter.append("</configuration>\n");
+ fileWriter.close();
+ }
+
+ private void writeConfigProperties(File file, Map<String,String> settings)
throws IOException {
+ FileWriter fileWriter = new FileWriter(file);
+
+ for (Entry<String,String> entry : settings.entrySet())
+ fileWriter.append(entry.getKey() + "=" + entry.getValue() + "\n");
+ fileWriter.close();
+ }
+
+ /**
+ * Starts Accumulo and Zookeeper processes. Can only be called once.
+ *
+ * @throws IllegalStateException
+ * if already started
+ */
+ public void start() throws IOException, InterruptedException {
+
+ if (!initialized) {
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ MiniAccumuloClusterImpl.this.stop();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ if (zooKeeperProcess == null) {
+ zooKeeperProcess = _exec(ZooKeeperServerMain.class,
ServerType.ZOOKEEPER, zooCfgFile.getAbsolutePath());
+ }
+
+ if (!initialized) {
+ // sleep a little bit to let zookeeper come up before calling init,
seems to work better
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket s = new Socket("localhost", config.getZooKeeperPort());
+ s.getOutputStream().write("ruok\n".getBytes());
+ s.getOutputStream().flush();
+ byte buffer[] = new byte[100];
+ int n = s.getInputStream().read(buffer);
+ if (n == 4 && new String(buffer, 0, n).equals("imok"))
+ break;
+ } catch (Exception e) {
+ if (System.currentTimeMillis() - startTime >= 10000) {
+ throw new RuntimeException("Zookeeper did not start within 10
seconds . Check the logs in " + config.getLogDir() + " for errors.");
+ }
+ UtilWaitThread.sleep(250);
+ }
+ }
+ Process initProcess = exec(Initialize.class, "--instance-name",
config.getInstanceName(), "--password", config.getRootPassword());
+ int ret = initProcess.waitFor();
+ if (ret != 0) {
+ throw new RuntimeException("Initialize process returned " + ret + ".
Check the logs in " + config.getLogDir() + " for errors.");
+ }
+ initialized = true;
+ }
+ synchronized (tabletServerProcesses) {
+ for (int i = tabletServerProcesses.size(); i < config.getNumTservers();
i++) {
+ tabletServerProcesses.add(_exec(TabletServer.class,
ServerType.TABLET_SERVER));
+ }
+ }
+ int ret = 0;
+ for (int i = 0; i < 5; i++) {
+ ret = exec(Main.class, SetGoalState.class.getName(),
MasterGoalState.NORMAL.toString()).waitFor();
+ if (ret == 0)
+ break;
+ UtilWaitThread.sleep(1000);
+ }
+ if (ret != 0) {
+ throw new RuntimeException("Could not set master goal state, process
returned " + ret + ". Check the logs in " + config.getLogDir() + " for
errors.");
+ }
+ if (masterProcess == null) {
+ masterProcess = _exec(Master.class, ServerType.MASTER);
+ }
+
+ if (gcProcess == null) {
+ gcProcess = _exec(SimpleGarbageCollector.class,
ServerType.GARBAGE_COLLECTOR);
+ }
+ }
+
+ private List<String> buildRemoteDebugParams(int port) {
+ return Arrays.asList(new String[] {"-Xdebug",
String.format("-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=%d",
port)});
+ }
+
+ /**
+ * @return generated remote debug ports if in debug mode.
+ * @since 1.6.0
+ */
+ public Set<Pair<ServerType,Integer>> getDebugPorts() {
+ return debugPorts;
+ }
+
+ List<ProcessReference> references(Process... procs) {
+ List<ProcessReference> result = new ArrayList<ProcessReference>();
+ for (Process proc : procs) {
+ result.add(new ProcessReference(proc));
+ }
+ return result;
+ }
+
+ public Map<ServerType,Collection<ProcessReference>> getProcesses() {
+ Map<ServerType,Collection<ProcessReference>> result = new
HashMap<ServerType,Collection<ProcessReference>>();
+ result.put(ServerType.MASTER, references(masterProcess));
+ result.put(ServerType.TABLET_SERVER,
references(tabletServerProcesses.toArray(new Process[0])));
+ result.put(ServerType.ZOOKEEPER, references(zooKeeperProcess));
+ if (null != gcProcess) {
+ result.put(ServerType.GARBAGE_COLLECTOR, references(gcProcess));
+ }
+ return result;
+ }
+
+ public void killProcess(ServerType type, ProcessReference proc) throws
ProcessNotFoundException, InterruptedException {
+ boolean found = false;
+ switch (type) {
+ case MASTER:
+ if (proc.equals(masterProcess)) {
+ masterProcess.destroy();
+ masterProcess = null;
+ found = true;
+ }
+ break;
+ case TABLET_SERVER:
+ synchronized (tabletServerProcesses) {
+ for (Process tserver : tabletServerProcesses) {
+ if (proc.equals(tserver)) {
+ tabletServerProcesses.remove(tserver);
+ tserver.destroy();
+ found = true;
+ break;
+ }
+ }
+ }
+ break;
+ case ZOOKEEPER:
+ if (proc.equals(zooKeeperProcess)) {
+ zooKeeperProcess.destroy();
+ zooKeeperProcess = null;
+ found = true;
+ }
+ break;
+ case GARBAGE_COLLECTOR:
+ if (proc.equals(gcProcess)) {
+ gcProcess.destroy();
+ gcProcess = null;
+ found = true;
+ }
+ break;
+ }
+ if (!found)
+ throw new ProcessNotFoundException();
+ }
+
+ /**
+ * @return Accumulo instance name
+ */
+ public String getInstanceName() {
+ return config.getInstanceName();
+ }
+
+ /**
+ * @return zookeeper connection string
+ */
+ public String getZooKeepers() {
+ return config.getZooKeepers();
+ }
+
+ /**
+ * Stops Accumulo and Zookeeper processes. If stop is not called, there is
a shutdown hook that is setup to kill the processes. However its probably best
to
+ * call stop in a finally block as soon as possible.
+ */
+ public void stop() throws IOException, InterruptedException {
+ for (LogWriter lw : logWriters) {
+ lw.flush();
+ }
+
+ if (zooKeeperProcess != null) {
+ zooKeeperProcess.destroy();
+ }
+ if (masterProcess != null) {
+ masterProcess.destroy();
+ }
+ if (tabletServerProcesses != null) {
+ synchronized (tabletServerProcesses) {
+ for (Process tserver : tabletServerProcesses) {
+ tserver.destroy();
+ }
+ }
+ }
+ if (gcProcess != null) {
+ gcProcess.destroy();
+ }
+
+ zooKeeperProcess = null;
+ masterProcess = null;
+ gcProcess = null;
+ tabletServerProcesses.clear();
+ if (config.useMiniDFS() && miniDFS != null)
+ miniDFS.shutdown();
+ for (Process p : cleanup)
+ p.destroy();
+ miniDFS = null;
+ }
+
+ /**
+ * @since 1.6.0
+ */
+ public MiniAccumuloConfigImpl getConfig() {
+ return config;
+ }
+
+ /**
+ * Utility method to get a connector to the MAC.
+ *
+ * @since 1.6.0
+ */
+ public Connector getConnector(String user, String passwd) throws
AccumuloException, AccumuloSecurityException {
+ Instance instance = new ZooKeeperInstance(getClientConfig());
+ return instance.getConnector(user, new PasswordToken(passwd));
+ }
+
+ public ClientConfiguration getClientConfig() {
+ return new ClientConfiguration(Arrays.asList(new
MapConfiguration(config.getSiteConfig()))).withInstance(this.getInstanceName()).withZkHosts(
+ this.getZooKeepers());
+ }
+
+ public FileSystem getFileSystem() {
+ try {
+ return FileSystem.get(new URI(dfsUri), new Configuration());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9e32bf8b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
----------------------------------------------------------------------
diff --cc
minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
index f1b23e4,0000000..04d9f6e
mode 100644,000000..100644
---
a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
@@@ -1,445 -1,0 +1,445 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.minicluster.impl;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.server.util.PortUtils;
+
+/**
+ * Holds configuration for {@link MiniAccumuloClusterImpl}. Required
configurations must be passed to constructor(s) and all other configurations
are optional.
+ *
+ * @since 1.5.0
+ */
+public class MiniAccumuloConfigImpl {
+
+ private static final String DEFAULT_INSTANCE_SECRET = "DONTTELL";
+
+ private File dir = null;
+ private String rootPassword = null;
+ private Map<String,String> siteConfig = new HashMap<String,String>();
+ private int numTservers = 2;
+ private Map<ServerType,Long> memoryConfig = new HashMap<ServerType,Long>();
+ private boolean jdwpEnabled = false;
+ private Map<String,String> systemProperties = new HashMap<String,String>();
+
+ private String instanceName = "miniInstance";
+
+ private File libDir;
++ private File libExtDir;
+ private File confDir;
+ private File zooKeeperDir;
+ private File accumuloDir;
+ private File logDir;
+ private File walogDir;
+
+ private Integer zooKeeperPort;
+ private long defaultMemorySize = 128 * 1024 * 1024;
+
+ private boolean initialized = false;
+
+ private boolean useMiniDFS = false;
+
+ private String[] classpathItems = null;
+
+ private String[] nativePathItems = null;
+
+ /**
+ * @param dir
+ * An empty or nonexistant directory that Accumulo and Zookeeper
can store data in. Creating the directory is left to the user. Java 7, Guava,
and
+ * Junit provide methods for creating temporary directories.
+ * @param rootPassword
+ * The initial password for the Accumulo root user
+ */
+ public MiniAccumuloConfigImpl(File dir, String rootPassword) {
+ this.dir = dir;
+ this.rootPassword = rootPassword;
+ }
+
+ /**
+ * Set directories and fully populate site config
+ */
+ MiniAccumuloConfigImpl initialize() {
+
+ // Sanity checks
+ if (this.getDir().exists() && !this.getDir().isDirectory())
+ throw new IllegalArgumentException("Must pass in directory, " +
this.getDir() + " is a file");
+
+ String[] dirList;
+ if (this.getDir().exists() && (dirList = this.getDir().list()).length !=
0)
+ if (!(dirList.length == 1 && dirList[0].equals("ssl")))
+ throw new IllegalArgumentException("Directory " + this.getDir() + "
is not empty");
+
+ if (!initialized) {
+ libDir = new File(dir, "lib");
++ libExtDir = new File(libDir, "ext");
+ confDir = new File(dir, "conf");
+ accumuloDir = new File(dir, "accumulo");
+ zooKeeperDir = new File(dir, "zookeeper");
+ logDir = new File(dir, "logs");
+ walogDir = new File(dir, "walogs");
+
- String[] paths = {"$ACCUMULO_HOME/lib/.*.jar",
"$ZOOKEEPER_HOME/zookeeper[^.].*.jar", "$HADOOP_PREFIX/[^.].*.jar",
"$HADOOP_PREFIX/lib/[^.].*.jar",
- "$HADOOP_PREFIX/share/hadoop/common/.*.jar",
"$HADOOP_PREFIX/share/hadoop/common/lib/.*.jar",
"$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar",
- "$HADOOP_PREFIX/share/hadoop/mapreduce/.*.jar"};
-
- String classpath = StringUtil.join(Arrays.asList(paths), ",");
-
+ mergeProp(Property.INSTANCE_DFS_URI.getKey(), "file:///");
+ mergeProp(Property.INSTANCE_DFS_DIR.getKey(),
accumuloDir.getAbsolutePath());
+ mergeProp(Property.INSTANCE_SECRET.getKey(), DEFAULT_INSTANCE_SECRET);
+ mergeProp(Property.TSERV_PORTSEARCH.getKey(), "true");
+ mergeProp(Property.LOGGER_DIR.getKey(), walogDir.getAbsolutePath());
+ mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M");
+ mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M");
+ mergeProp(Property.TSERV_MAXMEM.getKey(), "50M");
+ mergeProp(Property.TSERV_WALOG_MAX_SIZE.getKey(), "100M");
+ mergeProp(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
+ mergeProp(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey() + ".password",
getRootPassword());
+ // since there is a small amount of memory, check more frequently for
majc... setting may not be needed in 1.5
+ mergeProp(Property.TSERV_MAJC_DELAY.getKey(), "3");
- mergeProp(Property.GENERAL_CLASSPATHS.getKey(), classpath);
- mergeProp(Property.GENERAL_DYNAMIC_CLASSPATHS.getKey(),
libDir.getAbsolutePath() + "/[^.].*[.]jar");
++ mergeProp(Property.GENERAL_CLASSPATHS.getKey(),
libDir.getAbsolutePath() + "/[^.].*[.]jar");
++ mergeProp(Property.GENERAL_DYNAMIC_CLASSPATHS.getKey(),
libExtDir.getAbsolutePath() + "/[^.].*[.]jar");
+ mergeProp(Property.GC_CYCLE_DELAY.getKey(), "4s");
+ mergeProp(Property.GC_CYCLE_START.getKey(), "0s");
+ mergePropWithRandomPort(Property.MASTER_CLIENTPORT.getKey());
+ mergePropWithRandomPort(Property.TRACE_PORT.getKey());
+ mergePropWithRandomPort(Property.TSERV_CLIENTPORT.getKey());
+ mergePropWithRandomPort(Property.MONITOR_PORT.getKey());
+ mergePropWithRandomPort(Property.GC_PORT.getKey());
+
+ // zookeeper port should be set explicitly in this class, not just on
the site config
+ if (zooKeeperPort == null)
+ zooKeeperPort = PortUtils.getRandomFreePort();
+ siteConfig.put(Property.INSTANCE_ZK_HOST.getKey(), "localhost:" +
zooKeeperPort);
+ initialized = true;
+ }
+ return this;
+ }
+
+ /**
+ * Set a given key/value on the site config if it doesn't already exist
+ */
+ private void mergeProp(String key, String value) {
+ if (!siteConfig.containsKey(key)) {
+ siteConfig.put(key, value);
+ }
+ }
+
+ /**
+ * Sets a given key with a random port for the value on the site config if
it doesn't already exist.
+ */
+ private void mergePropWithRandomPort(String key) {
+ if (!siteConfig.containsKey(key)) {
+ siteConfig.put(key, "0");
+ }
+ }
+
+ /**
+ * Calling this method is optional. If not set, it defaults to two.
+ *
+ * @param numTservers
+ * the number of tablet servers that mini accumulo cluster should
start
+ */
+ public MiniAccumuloConfigImpl setNumTservers(int numTservers) {
+ if (numTservers < 1)
+ throw new IllegalArgumentException("Must have at least one tablet
server");
+ this.numTservers = numTservers;
+ return this;
+ }
+
+ /**
+ * Calling this method is optional. If not set, defaults to 'miniInstance'
+ *
+ * @since 1.6.0
+ */
+ public MiniAccumuloConfigImpl setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ return this;
+ }
+
+ /**
+ * Calling this method is optional. If not set, it defaults to an empty map.
+ *
+ * @param siteConfig
+ * key/values that you normally put in accumulo-site.xml can be
put here.
+ */
+ public MiniAccumuloConfigImpl setSiteConfig(Map<String,String> siteConfig) {
+ this.siteConfig = new HashMap<String,String>(siteConfig);
+ return this;
+ }
+
+ /**
+ * Calling this method is optional. A random port is generated by default
+ *
+ * @param zooKeeperPort
+ * A valid (and unused) port to use for the zookeeper
+ *
+ * @since 1.6.0
+ */
+ public MiniAccumuloConfigImpl setZooKeeperPort(int zooKeeperPort) {
+ this.zooKeeperPort = zooKeeperPort;
+ return this;
+ }
+
+ /**
+ * Sets the amount of memory to use in the master process. Calling this
method is optional. Default memory is 128M
+ *
+ * @param serverType
+ * the type of server to apply the memory settings
+ * @param memory
+ * amount of memory to set
+ *
+ * @param memoryUnit
+ * the units for which to apply with the memory size
+ *
+ * @since 1.6.0
+ */
+ public MiniAccumuloConfigImpl setMemory(ServerType serverType, long memory,
MemoryUnit memoryUnit) {
+ this.memoryConfig.put(serverType, memoryUnit.toBytes(memory));
+ return this;
+ }
+
+ /**
+ * Sets the default memory size to use. This value is also used when a
ServerType has not been configured explicitly. Calling this method is optional.
Default
+ * memory is 128M
+ *
+ * @param memory
+ * amount of memory to set
+ *
+ * @param memoryUnit
+ * the units for which to apply with the memory size
+ *
+ * @since 1.6.0
+ */
+ public MiniAccumuloConfigImpl setDefaultMemory(long memory, MemoryUnit
memoryUnit) {
+ this.defaultMemorySize = memoryUnit.toBytes(memory);
+ return this;
+ }
+
+ /**
+ * @return a copy of the site config
+ */
+ public Map<String,String> getSiteConfig() {
+ return new HashMap<String,String>(siteConfig);
+ }
+
+ /**
+ * @return name of configured instance
+ *
+ * @since 1.6.0
+ */
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ /**
+ * @return The configured zookeeper port
+ *
+ * @since 1.6.0
+ */
+ public int getZooKeeperPort() {
+ return zooKeeperPort;
+ }
+
+ File getLibDir() {
+ return libDir;
+ }
+
++ File getLibExtDir() {
++ return libExtDir;
++ }
++
+ public File getConfDir() {
+ return confDir;
+ }
+
+ File getZooKeeperDir() {
+ return zooKeeperDir;
+ }
+
+ public File getAccumuloDir() {
+ return accumuloDir;
+ }
+
+ public File getLogDir() {
+ return logDir;
+ }
+
+ File getWalogDir() {
+ return walogDir;
+ }
+
+ /**
+ * @param serverType
+ * get configuration for this server type
+ *
+ * @return memory configured in bytes, returns default if this server type
is not configured
+ *
+ * @since 1.6.0
+ */
+ public long getMemory(ServerType serverType) {
+ return memoryConfig.containsKey(serverType) ?
memoryConfig.get(serverType) : defaultMemorySize;
+ }
+
+ /**
+ * @return memory configured in bytes
+ *
+ * @since 1.6.0
+ */
+ public long getDefaultMemory() {
+ return defaultMemorySize;
+ }
+
+ /**
+ * @return zookeeper connection string
+ *
+ * @since 1.6.0
+ */
+ public String getZooKeepers() {
+ return siteConfig.get(Property.INSTANCE_ZK_HOST.getKey());
+ }
+
+ /**
+ * @return the base directory of the cluster configuration
+ */
+ public File getDir() {
+ return dir;
+ }
+
+ /**
+ * @return the root password of this cluster configuration
+ */
+ public String getRootPassword() {
+ return rootPassword;
+ }
+
+ /**
+ * @return the number of tservers configured for this cluster
+ */
+ public int getNumTservers() {
+ return numTservers;
+ }
+
+ /**
+ * @return is the current configuration in jdwpEnabled mode?
+ *
+ * @since 1.6.0
+ */
+ public boolean isJDWPEnabled() {
+ return jdwpEnabled;
+ }
+
+ /**
+ * @param jdwpEnabled
+ * should the processes run remote jdwpEnabled servers?
+ * @return the current instance
+ *
+ * @since 1.6.0
+ */
+ public MiniAccumuloConfigImpl setJDWPEnabled(boolean jdwpEnabled) {
+ this.jdwpEnabled = jdwpEnabled;
+ return this;
+ }
+
+ public boolean useMiniDFS() {
+ return useMiniDFS;
+ }
+
+ public void useMiniDFS(boolean useMiniDFS) {
+ this.useMiniDFS = useMiniDFS;
+ }
+
+ /**
+ * @return location of client conf file containing connection parameters
for connecting to this minicluster
+ *
+ * @since 1.6.0
+ */
+ public File getClientConfFile() {
+ return new File(getConfDir(), "client.conf");
+ }
+
+ /**
+ * sets system properties set for service processes
+ *
+ * @since 1.6.0
+ */
+ public void setSystemProperties(Map<String,String> systemProperties) {
+ this.systemProperties = new HashMap<String,String>(systemProperties);
+ }
+
+ /**
+ * @return a copy of the system properties for service processes
+ *
+ * @since 1.6.0
+ */
+ public Map<String,String> getSystemProperties() {
+ return new HashMap<String,String>(systemProperties);
+ }
+
+ /**
+ * Gets the classpath elements to use when spawning processes.
+ *
+ * @return the classpathItems, if set
+ *
+ * @since 1.6.0
+ */
+ public String[] getClasspathItems() {
+ return classpathItems;
+ }
+
+ /**
+ * Sets the classpath elements to use when spawning processes.
+ *
+ * @param classpathItems
+ * the classpathItems to set
+ * @since 1.6.0
+ */
+ public void setClasspathItems(String... classpathItems) {
+ this.classpathItems = classpathItems;
+ }
+
+ /**
+ * @return the paths to use for loading native libraries
+ *
+ * @since 1.6.0
+ */
+ public String[] getNativeLibPaths() {
+ return this.nativePathItems == null ? new String[0] :
this.nativePathItems;
+ }
+
+ /**
+ * Sets the path for processes to use for loading native libraries
+ *
+ * @param nativePathItems
+ * the nativePathItems to set
+ * @since 1.6.0
+ */
+ public void setNativeLibPaths(String... nativePathItems) {
+ this.nativePathItems = nativePathItems;
+ }
+
+ /**
+ * Sets arbitrary configuration properties.
+ *
+ * @since 1.6.0
+ */
+ public void setProperty(Property p, String value) {
+ this.siteConfig.put(p.getKey(), value);
+ }
+}