Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT



Branch: refs/heads/master
Commit: d0f626d9670415d9788ae1c0af33d1b931484f73
Parents: 1413ebc 96bfa8c
Author: Keith Turner <>
Authored: Thu Jan 9 16:04:40 2014 -0500
Committer: Keith Turner <>
Committed: Thu Jan 9 16:04:40 2014 -0500

 .../minicluster/        |  7 +++--
 .../minicluster/         | 17 ------------
 .../minicluster/  | 27 --------------------
 3 files changed, 3 insertions(+), 48 deletions(-)
diff --cc 
index d12c05a,0000000..7c618be
mode 100644,000000..100644
@@@ -1,375 -1,0 +1,374 @@@
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.minicluster;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Properties;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.gc.SimpleGarbageCollector;
 +import org.apache.accumulo.server.master.Master;
 +import org.apache.accumulo.server.tabletserver.TabletServer;
 +import org.apache.accumulo.server.util.Initialize;
 +import org.apache.accumulo.server.util.PortUtils;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.start.Main;
 +import org.apache.zookeeper.server.ZooKeeperServerMain;
 + * A utility class that will create Zookeeper and Accumulo processes that 
write all of their data to a single local directory. This class makes it easy 
to test
 + * code against a real Accumulo instance. Its much more accurate for testing 
than MockAccumulo, but much slower than MockAccumulo.
 + * 
 + * @since 1.5.0
 + */
 +public class MiniAccumuloCluster {
 +  private static final String INSTANCE_SECRET = "DONTTELL";
 +  private static final String INSTANCE_NAME = "miniInstance";
 +  private static class LogWriter extends Thread {
 +    private BufferedReader in;
 +    private BufferedWriter out;
 +    /**
 +     * @throws IOException
 +     */
 +    public LogWriter(InputStream stream, File logFile) throws IOException {
 +      this.setDaemon(true);
 + = new BufferedReader(new InputStreamReader(stream));
 +      out = new BufferedWriter(new FileWriter(logFile));
 +      SimpleTimer.getInstance().schedule(new Runnable() {
 +        @Override
 +        public void run() {
 +          try {
 +            flush();
 +          } catch (IOException e) {
 +            e.printStackTrace();
 +          }
 +        }
 +      }, 1000, 1000);
 +    }
 +    public synchronized void flush() throws IOException {
 +      if (out != null)
 +        out.flush();
 +    }
 +    @Override
 +    public void run() {
 +      String line;
 +      try {
 +        while ((line = in.readLine()) != null) {
 +          out.append(line);
 +          out.append("\n");
 +        }
 +        synchronized (this) {
 +          out.close();
 +          out = null;
 +          in.close();
 +        }
 +      } catch (IOException e) {}
 +    }
 +  }
 +  private File libDir;
 +  private File confDir;
 +  private File zooKeeperDir;
 +  private File accumuloDir;
 +  private File zooCfgFile;
 +  private File logDir;
 +  private File walogDir;
 +  private Process zooKeeperProcess;
 +  private Process masterProcess;
 +  private Process gcProcess;
 +  private int zooKeeperPort;
 +  private List<LogWriter> logWriters = new 
 +  private MiniAccumuloConfig config;
 +  private Process[] tabletServerProcesses;
 +  private Process exec(Class<? extends Object> clazz, String... args) throws 
IOException {
 +    String javaHome = System.getProperty("java.home");
 +    String javaBin = javaHome + File.separator + "bin" + File.separator + 
 +    String classpath = System.getProperty("java.class.path");
 +    classpath = confDir.getAbsolutePath() + File.pathSeparator + classpath;
 +    String className = clazz.getCanonicalName();
 +    ArrayList<String> argList = new ArrayList<String>();
 +    argList.addAll(Arrays.asList(javaBin, "-cp", classpath, "-Xmx128m", 
"-XX:+UseConcMarkSweepGC", "-XX:CMSInitiatingOccupancyFraction=75",
 +        "-Dapple.awt.UIElement=true", Main.class.getName(), className));
 +    argList.addAll(Arrays.asList(args));
 +    ProcessBuilder builder = new ProcessBuilder(argList);
 +    builder.environment().put("ACCUMULO_HOME", 
 +    builder.environment().put("ACCUMULO_LOG_DIR", logDir.getAbsolutePath());
 +    // if we're running under accumulo.start, we forward these env vars
 +    String env = System.getenv("HADOOP_PREFIX");
 +    if (env != null)
 +      builder.environment().put("HADOOP_PREFIX", env);
 +    env = System.getenv("ZOOKEEPER_HOME");
 +    if (env != null)
 +      builder.environment().put("ZOOKEEPER_HOME", env);
 +    Process process = builder.start();
 +    LogWriter lw;
 +    lw = new LogWriter(process.getErrorStream(), new File(logDir, 
clazz.getSimpleName() + "_" + process.hashCode() + ".err"));
 +    logWriters.add(lw);
 +    lw.start();
 +    lw = new LogWriter(process.getInputStream(), new File(logDir, 
clazz.getSimpleName() + "_" + process.hashCode() + ".out"));
 +    logWriters.add(lw);
 +    lw.start();
 +    return process;
 +  }
 +  private void appendProp(FileWriter fileWriter, Property key, String value, 
Map<String,String> siteConfig) throws IOException {
 +    appendProp(fileWriter, key.getKey(), value, siteConfig);
 +  }
 +  private void appendProp(FileWriter fileWriter, String key, String value, 
Map<String,String> siteConfig) throws IOException {
 +    if (!siteConfig.containsKey(key))
 +      fileWriter.append("<property><name>" + key + "</name><value>" + value + 
 +  }
 +  /**
 +   * Sets a given key with a random port for the value on the site config if 
it doesn't already exist.
 +   */
 +  private void mergePropWithRandomPort(Map<String,String> siteConfig, String 
key) {
 +    if (!siteConfig.containsKey(key)) {
 +      siteConfig.put(key, "0");
 +    }
 +  }
 +  /**
 +   * 
 +   * @param dir
 +   *          An empty or nonexistant temp directoy that Accumulo and 
Zookeeper can store data in. Creating the directory is left to the user. Java 
7, Guava,
 +   *          and Junit provide methods for creating temporary directories.
 +   * @param rootPassword
 +   *          Initial root password for instance.
 +   * @throws IOException
 +   */
 +  public MiniAccumuloCluster(File dir, String rootPassword) throws 
IOException {
 +    this(new MiniAccumuloConfig(dir, rootPassword));
 +  }
 +  /**
 +   * @param config
 +   *          initial configuration
 +   * @throws IOException
 +   */
 +  public MiniAccumuloCluster(MiniAccumuloConfig config) throws IOException {
 +    if (config.getDir().exists() && !config.getDir().isDirectory())
 +      throw new IllegalArgumentException("Must pass in directory, " + 
config.getDir() + " is a file");
 +    if (config.getDir().exists() && config.getDir().list().length != 0)
 +      throw new IllegalArgumentException("Directory " + config.getDir() + " 
is not empty");
 +    this.config = config;
 +    libDir = new File(config.getDir(), "lib");
 +    confDir = new File(config.getDir(), "conf");
 +    accumuloDir = new File(config.getDir(), "accumulo");
 +    zooKeeperDir = new File(config.getDir(), "zookeeper");
 +    logDir = new File(config.getDir(), "logs");
 +    walogDir = new File(config.getDir(), "walogs");
 +    confDir.mkdirs();
 +    accumuloDir.mkdirs();
 +    zooKeeperDir.mkdirs();
 +    logDir.mkdirs();
 +    walogDir.mkdirs();
 +    libDir.mkdirs();
 +    zooKeeperPort = PortUtils.getRandomFreePort();
 +    File siteFile = new File(confDir, "accumulo-site.xml");
 +    FileWriter fileWriter = new FileWriter(siteFile);
 +    fileWriter.append("<configuration>\n");
 +    HashMap<String,String> siteConfig = new 
 +    appendProp(fileWriter, Property.INSTANCE_DFS_URI, "file:///", siteConfig);
 +    appendProp(fileWriter, Property.INSTANCE_DFS_DIR, 
accumuloDir.getAbsolutePath(), siteConfig);
 +    appendProp(fileWriter, Property.INSTANCE_ZK_HOST, "localhost:" + 
zooKeeperPort, siteConfig);
 +    appendProp(fileWriter, Property.INSTANCE_SECRET, INSTANCE_SECRET, 
 +    appendProp(fileWriter, Property.TSERV_PORTSEARCH, "true", siteConfig);
 +    appendProp(fileWriter, Property.LOGGER_DIR, walogDir.getAbsolutePath(), 
 +    appendProp(fileWriter, Property.TSERV_DATACACHE_SIZE, "10M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_INDEXCACHE_SIZE, "10M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_MAXMEM, "50M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_WALOG_MAX_SIZE, "100M", siteConfig);
 +    appendProp(fileWriter, Property.TSERV_NATIVEMAP_ENABLED, "false", 
 +    appendProp(fileWriter, Property.TRACE_TOKEN_PROPERTY_PREFIX + 
".password", config.getRootPassword(), siteConfig);
-     appendProp(fileWriter, Property.GC_CYCLE_DELAY, "30s", siteConfig);
++    appendProp(fileWriter, Property.GC_CYCLE_DELAY, "4s", siteConfig);
++    appendProp(fileWriter, Property.GC_CYCLE_START, "0s", siteConfig);
 +    mergePropWithRandomPort(siteConfig, Property.MASTER_CLIENTPORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.TRACE_PORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.TSERV_CLIENTPORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.MONITOR_PORT.getKey());
 +    mergePropWithRandomPort(siteConfig, Property.GC_PORT.getKey());
 +    // since there is a small amount of memory, check more frequently for 
majc... setting may not be needed in 1.5
 +    appendProp(fileWriter, Property.TSERV_MAJC_DELAY, "3", siteConfig);
 +    String cp = System.getenv("ACCUMULO_HOME") + "/lib/.*.jar," + 
"$ZOOKEEPER_HOME/zookeeper[^.].*.jar," + "$HADOOP_HOME/[^.].*.jar,"
 +        + "$HADOOP_HOME/lib/[^.].*.jar," + 
"$HADOOP_PREFIX/share/hadoop/common/.*.jar," + 
 +        + "$HADOOP_PREFIX/share/hadoop/hdfs/.*.jar," + 
 +    appendProp(fileWriter, Property.GENERAL_CLASSPATHS, cp, siteConfig);
 +    appendProp(fileWriter, Property.GENERAL_DYNAMIC_CLASSPATHS, 
libDir.getAbsolutePath(), siteConfig);
 +    for (Entry<String,String> entry : siteConfig.entrySet())
 +      fileWriter.append("<property><name>" + entry.getKey() + 
"</name><value>" + entry.getValue() + "</value></property>\n");
 +    fileWriter.append("</configuration>\n");
 +    fileWriter.close();
 +    zooCfgFile = new File(confDir, "zoo.cfg");
 +    fileWriter = new FileWriter(zooCfgFile);
 +    // zookeeper uses Properties to read its config, so use that to write in 
order to properly escape things like Windows paths
 +    Properties zooCfg = new Properties();
 +    zooCfg.setProperty("tickTime", "1000");
 +    zooCfg.setProperty("initLimit", "10");
 +    zooCfg.setProperty("syncLimit", "5");
 +    zooCfg.setProperty("clientPort", zooKeeperPort + "");
 +    zooCfg.setProperty("maxClientCnxns", "100");
 +    zooCfg.setProperty("dataDir", zooKeeperDir.getAbsolutePath());
 +, null);
 +    fileWriter.close();
 +  }
 +  /**
 +   * Starts Accumulo and Zookeeper processes. Can only be called once.
 +   * 
 +   * @throws IOException
 +   * @throws InterruptedException
 +   * @throws IllegalStateException
 +   *           if already started
 +   */
 +  public void start() throws IOException, InterruptedException {
 +    if (zooKeeperProcess != null)
 +      throw new IllegalStateException("Already started");
 +    Runtime.getRuntime().addShutdownHook(new Thread() {
 +      @Override
 +      public void run() {
 +        try {
 +          MiniAccumuloCluster.this.stop();
 +        } catch (IOException e) {
 +          e.printStackTrace();
 +        } catch (InterruptedException e) {
 +          e.printStackTrace();
 +        }
 +      }
 +    });
 +    zooKeeperProcess = exec(Main.class, ZooKeeperServerMain.class.getName(), 
 +    // sleep a little bit to let zookeeper come up before calling init, seems 
to work better
 +    UtilWaitThread.sleep(250);
 +    Process initProcess = exec(Initialize.class, "--instance-name", 
INSTANCE_NAME, "--password", config.getRootPassword());
 +    int ret = initProcess.waitFor();
 +    if (ret != 0) {
 +      throw new RuntimeException("Initialize process returned " + ret + ". 
Check the logs in " + logDir + " for errors.");
 +    }
 +    tabletServerProcesses = new Process[config.getNumTservers()];
 +    for (int i = 0; i < config.getNumTservers(); i++) {
 +      tabletServerProcesses[i] = exec(TabletServer.class);
 +    }
 +    masterProcess = exec(Master.class);
-     if (config.shouldRunGC()) {
-       gcProcess = exec(SimpleGarbageCollector.class);
-     }
++    gcProcess = exec(SimpleGarbageCollector.class);
 +  }
 +  /**
 +   * @return Accumulo instance name
 +   */
 +  public String getInstanceName() {
 +    return INSTANCE_NAME;
 +  }
 +  /**
 +   * @return zookeeper connection string
 +   */
 +  public String getZooKeepers() {
 +    return "localhost:" + zooKeeperPort;
 +  }
 +  /**
 +   * Stops Accumulo and Zookeeper processes. If stop is not called, there is 
a shutdown hook that is setup to kill the processes. Howerver its probably best 
 +   * call stop in a finally block as soon as possible.
 +   * 
 +   * @throws IOException
 +   * @throws InterruptedException
 +   */
 +  public void stop() throws IOException, InterruptedException {
 +    if (zooKeeperProcess != null)
 +      zooKeeperProcess.destroy();
 +    if (masterProcess != null)
 +      masterProcess.destroy();
 +    if (tabletServerProcesses != null) {
 +      for (Process tserver : tabletServerProcesses) {
 +        tserver.destroy();
 +      }
 +    }
 +    for (LogWriter lw : logWriters)
 +      lw.flush();
 +    if (gcProcess != null)
 +      gcProcess.destroy();
 +  }
diff --cc 
index 94094ed,0000000..17ab03a
mode 100644,000000..100644
@@@ -1,107 -1,0 +1,90 @@@
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.minicluster;
 +import java.util.Collections;
 +import java.util.Map;
 + * Holds configuration for {@link MiniAccumuloCluster}. Required 
configurations must be passed to constructor and all other configurations are 
 + * 
 + * @since 1.5.0
 + */
 +public class MiniAccumuloConfig {
 +  private File dir = null;
 +  private String rootPassword = null;
 +  private Map<String,String> siteConfig = Collections.emptyMap();
 +  private int numTservers = 2;
-   private boolean runGC = false;
 +  /**
 +   * @param dir
 +   *          An empty or nonexistant temp directoy that Accumulo and 
Zookeeper can store data in. Creating the directory is left to the user. Java 
7, Guava,
 +   *          and Junit provide methods for creating temporary directories.
 +   * @param rootPassword
 +   *          The initial password for the Accumulo root user
 +   */
 +  public MiniAccumuloConfig(File dir, String rootPassword) {
 +    this.dir = dir;
 +    this.rootPassword = rootPassword;
 +  }
 +  public File getDir() {
 +    return dir;
 +  }
 +  public String getRootPassword() {
 +    return rootPassword;
 +  }
 +  public int getNumTservers() {
 +    return numTservers;
 +  }
 +  /**
 +   * Calling this method is optional. If not set, it defaults to two.
 +   * 
 +   * @param numTservers
 +   *          the number of tablet servers that mini accumulo cluster should 
 +   */
 +  public MiniAccumuloConfig setNumTservers(int numTservers) {
 +    if (numTservers < 1)
 +      throw new IllegalArgumentException("Must have at least one tablet 
 +    this.numTservers = numTservers;
 +    return this;
 +  }
 +  public Map<String,String> getSiteConfig() {
 +    return siteConfig;
 +  }
 +  /**
 +   * Calling this method is optional. If not set, it defautls to an empty map.
 +   * 
 +   * @param siteConfig
 +   *          key/values that you normally put in accumulo-site.xml can be 
put here
 +   */
 +  public MiniAccumuloConfig setSiteConfig(Map<String,String> siteConfig) {
 +    this.siteConfig = siteConfig;
 +    return this;
 +  }
-   /**
-    * Whether or not the Accumulo garbage collector proces will run
-    */
-   public boolean shouldRunGC() {
-     return runGC;
-   }
-   /**
-    * Sets if the Accumulo garbage collector process should run
-    * 
-    * @param shouldRunGC
-    */
-   public void runGC(boolean shouldRunGC) {
-     runGC = shouldRunGC;
-   }
diff --cc 
index a1f58f6,0000000..318f0af
mode 100644,000000..100644
@@@ -1,150 -1,0 +1,123 @@@
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.minicluster;
 +import java.util.Map;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.server.util.PortUtils;
 +import org.apache.log4j.Level;
 +import org.apache.log4j.Logger;
 +import org.junit.Assert;
 +import org.junit.Ignore;
- import org.junit.Test;
 +import org.junit.rules.TemporaryFolder;
- import;
 + * 
 + */
 +public class MiniAccumuloClusterGCTest {
-   @Test
-   public void testGcConfig() throws Exception {
-     File f = Files.createTempDir();
-     f.deleteOnExit();
-     try {
-       MiniAccumuloConfig macConfig = new MiniAccumuloConfig(f, passwd);
-       macConfig.setNumTservers(1);
-       Assert.assertEquals(false, macConfig.shouldRunGC());
-       // Turn on the garbage collector
-       macConfig.runGC(true);
-       Assert.assertEquals(true, macConfig.shouldRunGC());
-     } finally {
-       if (null != f && f.exists()) {
-         f.delete();
-       }
-     }
-   }
 +  private static TemporaryFolder tmpDir = new TemporaryFolder();
 +  private static MiniAccumuloConfig macConfig;
 +  private static MiniAccumuloCluster accumulo;
 +  private static final String passwd = "password";
 +  public static void setupMiniCluster() throws Exception {
 +    tmpDir.create();
 +    Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
 +    macConfig = new MiniAccumuloConfig(tmpDir.getRoot(), passwd);
 +    macConfig.setNumTservers(1);
-     // Turn on the garbage collector
-     macConfig.runGC(true);
 +    String gcPort = Integer.toString(PortUtils.getRandomFreePort());
 +    // And tweak the settings to make it run often
 +    Map<String,String> config = 
ImmutableMap.of(Property.GC_CYCLE_DELAY.getKey(), "1s", 
Property.GC_CYCLE_START.getKey(), "0s", Property.GC_PORT.getKey(), gcPort);
 +    macConfig.setSiteConfig(config);
 +    accumulo = new MiniAccumuloCluster(macConfig);
 +    accumulo.start();
 +  }
 +  public static void tearDownMiniCluster() throws Exception {
 +    accumulo.stop();
 +    tmpDir.delete();
 +  }
 +  // This test seems to be a little too unstable for a unit test
 +  @Ignore
 +  public void test() throws Exception {
 +    ZooKeeperInstance inst = new 
ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
 +    Connector c = inst.getConnector("root", new PasswordToken(passwd));
 +    final String table = "foobar";
 +    c.tableOperations().create(table);
 +    BatchWriter bw = null;
 +    // Add some data
 +    try {
 +      bw = c.createBatchWriter(table, new BatchWriterConfig());
 +      Mutation m = new Mutation("a");
 +      for (int i = 0; i < 50; i++) {
 +        m.put("colf", Integer.toString(i), "");
 +      }
 +      bw.addMutation(m);
 +    } finally {
 +      if (null != bw) {
 +        bw.close();
 +      }
 +    }
 +    final boolean flush = true, wait = true;
 +    // Compact the tables to get some rfiles which we can gc
 +    c.tableOperations().compact(table, null, null, flush, wait);
 +    c.tableOperations().compact("!METADATA", null, null, flush, wait);
 +    File accumuloDir = new File(tmpDir.getRoot().getAbsolutePath(), 
 +    File tables = new File(accumuloDir.getAbsolutePath(), "tables");
 +    int fileCountAfterCompaction = FileUtils.listFiles(tables, new 
SuffixFileFilter(".rf"), TrueFileFilter.TRUE).size();
 +    // Sleep for 4s to let the GC do its thing
 +    for (int i = 1; i < 5; i++) {
 +      Thread.sleep(1000);
 +      int fileCountAfterGCWait = FileUtils.listFiles(tables, new 
SuffixFileFilter(".rf"), TrueFileFilter.TRUE).size();
 +      if (fileCountAfterGCWait < fileCountAfterCompaction) {
 +        return;
 +      }
 +    }
 +"Expected to find less files after compaction and pause for 
 +  }

Reply via email to