Merge branch '1.7'

Conflicts:
        core/src/test/java/org/apache/accumulo/core/util/ByteBufferUtilTest.java
        test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
        test/src/main/java/org/apache/accumulo/test/functional/DeleteIT.java
        test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java
        test/src/main/java/org/apache/accumulo/test/functional/RestartIT.java
        
test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java
        test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
        test/src/test/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2c8088e9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2c8088e9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2c8088e9

Branch: refs/heads/master
Commit: 2c8088e99d6820449e2b364162fb9bd03d212f38
Parents: 1c696a9 a706b01
Author: Christopher Tubbs <ctubb...@apache.org>
Authored: Tue Feb 2 22:08:12 2016 -0500
Committer: Christopher Tubbs <ctubb...@apache.org>
Committed: Tue Feb 2 22:08:12 2016 -0500

----------------------------------------------------------------------
 .../core/file/rfile/VisMetricsGatherer.java     |  5 +++--
 .../accumulo/core/util/ByteBufferUtilTest.java  | 20 ++++++++++----------
 .../core/util/UnsynchronizedBufferTest.java     | 10 +++++-----
 .../apache/accumulo/proxy/ProxyServerTest.java  |  5 ++---
 .../server/util/FileSystemMonitorTest.java      |  3 +--
 .../providers/HdfsFileContentInfoFactory.java   |  4 +++-
 .../vfs/providers/HdfsFileSystem.java           |  4 +++-
 .../vfs/providers/HdfsRandomAccessContent.java  |  5 +++--
 .../accumulo/harness/MiniClusterHarness.java    |  4 ++--
 .../accumulo/test/ConditionalWriterIT.java      |  6 +++---
 .../accumulo/test/GenerateSequentialRFile.java  |  4 ++--
 .../org/apache/accumulo/test/ShellConfigIT.java |  4 ++--
 .../accumulo/test/TransportCachingIT.java       |  2 +-
 .../functional/BulkSplitOptimizationIT.java     |  2 +-
 .../accumulo/test/functional/DeleteIT.java      |  5 ++---
 .../accumulo/test/functional/ExamplesIT.java    |  2 +-
 .../accumulo/test/functional/ReadWriteIT.java   |  6 +++---
 .../accumulo/test/functional/RestartIT.java     | 13 ++++++-------
 .../test/functional/RestartStressIT.java        |  9 ++++-----
 .../accumulo/test/functional/ScanIdIT.java      |  2 +-
 .../accumulo/test/functional/SplitIT.java       |  9 ++++-----
 .../replication/merkle/cli/GenerateHashes.java  |  5 +++--
 22 files changed, 65 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/core/src/test/java/org/apache/accumulo/core/util/ByteBufferUtilTest.java
----------------------------------------------------------------------
diff --cc 
core/src/test/java/org/apache/accumulo/core/util/ByteBufferUtilTest.java
index d6e1b35,5a8c0dc..85a36fa
--- a/core/src/test/java/org/apache/accumulo/core/util/ByteBufferUtilTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/util/ByteBufferUtilTest.java
@@@ -17,7 -17,8 +17,9 @@@
  
  package org.apache.accumulo.core.util;
  
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ 
 +import java.io.ByteArrayInputStream;
  import java.io.ByteArrayOutputStream;
  import java.io.DataOutputStream;
  import java.io.IOException;
@@@ -54,16 -53,7 +54,16 @@@ public class ByteBufferUtilTest 
        throw new RuntimeException(e);
      }
  
-     Assert.assertEquals(expected, new String(baos.toByteArray(), 
Charsets.UTF_8));
+     Assert.assertEquals(expected, new String(baos.toByteArray(), UTF_8));
 +
 +    ByteArrayInputStream bais = ByteBufferUtil.toByteArrayInputStream(bb);
 +    byte[] buffer = new byte[expected.length()];
 +    try {
 +      bais.read(buffer);
-       Assert.assertEquals(expected, new String(buffer, Charsets.UTF_8));
++      Assert.assertEquals(expected, new String(buffer, UTF_8));
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
    }
  
    @Test

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java
index d923593,0000000..f021e32
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java
+++ b/test/src/main/java/org/apache/accumulo/harness/MiniClusterHarness.java
@@@ -1,242 -1,0 +1,242 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.harness;
 +
++import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.BufferedOutputStream;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.OutputStream;
 +import java.util.Map;
 +import java.util.UUID;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 +import org.apache.accumulo.server.security.handler.KerberosAuthenticator;
 +import org.apache.accumulo.server.security.handler.KerberosAuthorizor;
 +import org.apache.accumulo.server.security.handler.KerberosPermissionHandler;
 +import org.apache.accumulo.test.functional.NativeMapIT;
 +import org.apache.accumulo.test.util.CertUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import com.google.common.base.Charsets;
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * Harness that sets up a MiniAccumuloCluster in a manner expected for 
Accumulo integration tests.
 + */
 +public class MiniClusterHarness {
 +  private static final Logger log = 
LoggerFactory.getLogger(MiniClusterHarness.class);
 +
 +  private static final AtomicLong COUNTER = new AtomicLong(0);
 +
 +  public static final String USE_SSL_FOR_IT_OPTION = 
"org.apache.accumulo.test.functional.useSslForIT",
 +      USE_CRED_PROVIDER_FOR_IT_OPTION = 
"org.apache.accumulo.test.functional.useCredProviderForIT",
 +      USE_KERBEROS_FOR_IT_OPTION = 
"org.apache.accumulo.test.functional.useKrbForIT", TRUE = 
Boolean.toString(true);
 +
 +  // TODO These are defined in MiniKdc >= 2.6.0. Can be removed when minimum 
Hadoop dependency is increased to that.
 +  public static final String JAVA_SECURITY_KRB5_CONF = 
"java.security.krb5.conf", SUN_SECURITY_KRB5_DEBUG = "sun.security.krb5.debug";
 +
 +  /**
 +   * Create a MiniAccumuloCluster using the given Token as the credentials 
for the root user.
 +   */
 +  public MiniAccumuloClusterImpl create(AuthenticationToken token) throws 
Exception {
 +    return create(MiniClusterHarness.class.getName(), 
Long.toString(COUNTER.incrementAndGet()), token);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(AuthenticationToken token, TestingKdc 
kdc) throws Exception {
 +    return create(MiniClusterHarness.class.getName(), 
Long.toString(COUNTER.incrementAndGet()), token, kdc);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(AccumuloITBase testBase, 
AuthenticationToken token) throws Exception {
 +    return create(testBase.getClass().getName(), 
testBase.testName.getMethodName(), token);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(AccumuloITBase testBase, 
AuthenticationToken token, TestingKdc kdc) throws Exception {
 +    return create(testBase, token, kdc, 
MiniClusterConfigurationCallback.NO_CALLBACK);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(AccumuloITBase testBase, 
AuthenticationToken token, TestingKdc kdc, MiniClusterConfigurationCallback 
configCallback)
 +      throws Exception {
 +    return create(testBase.getClass().getName(), 
testBase.testName.getMethodName(), token, configCallback, kdc);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(AccumuloClusterHarness testBase, 
AuthenticationToken token, TestingKdc kdc) throws Exception {
 +    return create(testBase.getClass().getName(), 
testBase.testName.getMethodName(), token, testBase, kdc);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(AccumuloClusterHarness testBase, 
AuthenticationToken token, MiniClusterConfigurationCallback callback) throws 
Exception {
 +    return create(testBase.getClass().getName(), 
testBase.testName.getMethodName(), token, callback);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(String testClassName, String 
testMethodName, AuthenticationToken token) throws Exception {
 +    return create(testClassName, testMethodName, token, 
MiniClusterConfigurationCallback.NO_CALLBACK);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(String testClassName, String 
testMethodName, AuthenticationToken token, TestingKdc kdc) throws Exception {
 +    return create(testClassName, testMethodName, token, 
MiniClusterConfigurationCallback.NO_CALLBACK, kdc);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(String testClassName, String 
testMethodName, AuthenticationToken token, MiniClusterConfigurationCallback 
configCallback)
 +      throws Exception {
 +    return create(testClassName, testMethodName, token, configCallback, null);
 +  }
 +
 +  public MiniAccumuloClusterImpl create(String testClassName, String 
testMethodName, AuthenticationToken token,
 +      MiniClusterConfigurationCallback configCallback, TestingKdc kdc) throws 
Exception {
 +    Preconditions.checkNotNull(token);
 +    Preconditions.checkArgument(token instanceof PasswordToken || token 
instanceof KerberosToken, "A PasswordToken or KerberosToken is required");
 +
 +    String rootPasswd;
 +    if (token instanceof PasswordToken) {
-       rootPasswd = new String(((PasswordToken) token).getPassword(), 
Charsets.UTF_8);
++      rootPasswd = new String(((PasswordToken) token).getPassword(), UTF_8);
 +    } else {
 +      rootPasswd = UUID.randomUUID().toString();
 +    }
 +
 +    File baseDir = AccumuloClusterHarness.createTestDir(testClassName + "_" + 
testMethodName);
 +    MiniAccumuloConfigImpl cfg = new MiniAccumuloConfigImpl(baseDir, 
rootPasswd);
 +
 +    // Enable native maps by default
 +    cfg.setNativeLibPaths(NativeMapIT.nativeMapLocation().getAbsolutePath());
 +    cfg.setProperty(Property.TSERV_NATIVEMAP_ENABLED, 
Boolean.TRUE.toString());
 +
 +    Configuration coreSite = new Configuration(false);
 +
 +    // Setup SSL and credential providers if the properties request such
 +    configureForEnvironment(cfg, getClass(), 
AccumuloClusterHarness.getSslDir(baseDir), coreSite, kdc);
 +
 +    // Invoke the callback for tests to configure MAC before it starts
 +    configCallback.configureMiniCluster(cfg, coreSite);
 +
 +    MiniAccumuloClusterImpl miniCluster = new MiniAccumuloClusterImpl(cfg);
 +
 +    // Write out any configuration items to a file so HDFS will pick them up 
automatically (from the classpath)
 +    if (coreSite.size() > 0) {
 +      File csFile = new File(miniCluster.getConfig().getConfDir(), 
"core-site.xml");
 +      if (csFile.exists())
 +        throw new RuntimeException(csFile + " already exist");
 +
 +      OutputStream out = new BufferedOutputStream(new FileOutputStream(new 
File(miniCluster.getConfig().getConfDir(), "core-site.xml")));
 +      coreSite.writeXml(out);
 +      out.close();
 +    }
 +
 +    return miniCluster;
 +  }
 +
 +  protected void configureForEnvironment(MiniAccumuloConfigImpl cfg, Class<?> 
testClass, File folder, Configuration coreSite, TestingKdc kdc) {
 +    if (TRUE.equals(System.getProperty(USE_SSL_FOR_IT_OPTION))) {
 +      configureForSsl(cfg, folder);
 +    }
 +    if (TRUE.equals(System.getProperty(USE_CRED_PROVIDER_FOR_IT_OPTION))) {
 +      cfg.setUseCredentialProvider(true);
 +    }
 +
 +    if (TRUE.equals(System.getProperty(USE_KERBEROS_FOR_IT_OPTION))) {
 +      if (TRUE.equals(System.getProperty(USE_SSL_FOR_IT_OPTION))) {
 +        throw new RuntimeException("Cannot use both SSL and Kerberos");
 +      }
 +
 +      try {
 +        configureForKerberos(cfg, folder, coreSite, kdc);
 +      } catch (Exception e) {
 +        throw new RuntimeException("Failed to initialize KDC", e);
 +      }
 +    }
 +  }
 +
 +  protected void configureForSsl(MiniAccumuloConfigImpl cfg, File folder) {
 +    Map<String,String> siteConfig = cfg.getSiteConfig();
 +    if 
(TRUE.equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
 +      // already enabled; don't mess with it
 +      return;
 +    }
 +
 +    File sslDir = new File(folder, "ssl");
 +    assertTrue(sslDir.mkdirs() || sslDir.isDirectory());
 +    File rootKeystoreFile = new File(sslDir, "root-" + cfg.getInstanceName() 
+ ".jks");
 +    File localKeystoreFile = new File(sslDir, "local-" + 
cfg.getInstanceName() + ".jks");
 +    File publicTruststoreFile = new File(sslDir, "public-" + 
cfg.getInstanceName() + ".jks");
 +    final String rootKeystorePassword = "root_keystore_password", 
truststorePassword = "truststore_password";
 +    try {
 +      new CertUtils(Property.RPC_SSL_KEYSTORE_TYPE.getDefaultValue(), 
"o=Apache Accumulo,cn=MiniAccumuloCluster", "RSA", 2048, 
"sha1WithRSAEncryption")
 +          .createAll(rootKeystoreFile, localKeystoreFile, 
publicTruststoreFile, cfg.getInstanceName(), rootKeystorePassword, 
cfg.getRootPassword(),
 +              truststorePassword);
 +    } catch (Exception e) {
 +      throw new RuntimeException("error creating MAC keystore", e);
 +    }
 +
 +    siteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
 +    siteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), 
localKeystoreFile.getAbsolutePath());
 +    siteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), 
cfg.getRootPassword());
 +    siteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), 
publicTruststoreFile.getAbsolutePath());
 +    siteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), 
truststorePassword);
 +    cfg.setSiteConfig(siteConfig);
 +  }
 +
 +  protected void configureForKerberos(MiniAccumuloConfigImpl cfg, File 
folder, Configuration coreSite, TestingKdc kdc) throws Exception {
 +    Map<String,String> siteConfig = cfg.getSiteConfig();
 +    if 
(TRUE.equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
 +      throw new RuntimeException("Cannot use both SSL and SASL/Kerberos");
 +    }
 +
 +    if 
(TRUE.equals(siteConfig.get(Property.INSTANCE_RPC_SASL_ENABLED.getKey()))) {
 +      // already enabled
 +      return;
 +    }
 +
 +    if (null == kdc) {
 +      throw new IllegalStateException("MiniClusterKdc was null");
 +    }
 +
 +    log.info("Enabling Kerberos/SASL for minicluster");
 +
 +    // Turn on SASL and set the keytab/principal information
 +    cfg.setProperty(Property.INSTANCE_RPC_SASL_ENABLED, "true");
 +    ClusterUser serverUser = kdc.getAccumuloServerUser();
 +    cfg.setProperty(Property.GENERAL_KERBEROS_KEYTAB, 
serverUser.getKeytab().getAbsolutePath());
 +    cfg.setProperty(Property.GENERAL_KERBEROS_PRINCIPAL, 
serverUser.getPrincipal());
 +    cfg.setProperty(Property.INSTANCE_SECURITY_AUTHENTICATOR, 
KerberosAuthenticator.class.getName());
 +    cfg.setProperty(Property.INSTANCE_SECURITY_AUTHORIZOR, 
KerberosAuthorizor.class.getName());
 +    cfg.setProperty(Property.INSTANCE_SECURITY_PERMISSION_HANDLER, 
KerberosPermissionHandler.class.getName());
 +    // Piggy-back on the "system user" credential, but use it as a normal 
KerberosToken, not the SystemToken.
 +    cfg.setProperty(Property.TRACE_USER, serverUser.getPrincipal());
 +    cfg.setProperty(Property.TRACE_TOKEN_TYPE, KerberosToken.CLASS_NAME);
 +
 +    // Pass down some KRB5 debug properties
 +    Map<String,String> systemProperties = cfg.getSystemProperties();
 +    systemProperties.put(JAVA_SECURITY_KRB5_CONF, 
System.getProperty(JAVA_SECURITY_KRB5_CONF, ""));
 +    systemProperties.put(SUN_SECURITY_KRB5_DEBUG, 
System.getProperty(SUN_SECURITY_KRB5_DEBUG, "false"));
 +    cfg.setSystemProperties(systemProperties);
 +
 +    // Make sure UserGroupInformation will do the correct login
 +    
coreSite.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
 +
 +    cfg.setRootUserName(kdc.getRootUser().getPrincipal());
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
index 6485056,0000000..1ce7bf1
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java
@@@ -1,1484 -1,0 +1,1484 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.accumulo.test;
 +
 +import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
++import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Random;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +
 +import org.apache.accumulo.cluster.AccumuloCluster;
 +import org.apache.accumulo.cluster.ClusterUser;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 +import org.apache.accumulo.core.client.ConditionalWriter;
 +import org.apache.accumulo.core.client.ConditionalWriter.Result;
 +import org.apache.accumulo.core.client.ConditionalWriter.Status;
 +import org.apache.accumulo.core.client.ConditionalWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.IteratorSetting;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableDeletedException;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.TableOfflineException;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.data.ArrayByteSequence;
 +import org.apache.accumulo.core.data.ByteSequence;
 +import org.apache.accumulo.core.data.Condition;
 +import org.apache.accumulo.core.data.ConditionalMutation;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.iterators.IteratorEnvironment;
 +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 +import org.apache.accumulo.core.iterators.LongCombiner.Type;
 +import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 +import org.apache.accumulo.core.iterators.WrappingIterator;
 +import org.apache.accumulo.core.iterators.user.SummingCombiner;
 +import org.apache.accumulo.core.iterators.user.VersioningIterator;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.security.ColumnVisibility;
 +import org.apache.accumulo.core.security.SystemPermission;
 +import org.apache.accumulo.core.security.TablePermission;
 +import org.apache.accumulo.core.trace.DistributedTrace;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.util.FastFormat;
 +import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.test.functional.BadIterator;
 +import org.apache.accumulo.test.functional.SlowIterator;
 +import org.apache.accumulo.tracer.TraceDump;
 +import org.apache.accumulo.tracer.TraceDump.Printer;
 +import org.apache.accumulo.tracer.TraceServer;
 +import org.apache.hadoop.io.Text;
 +import org.junit.Assert;
 +import org.junit.Assume;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
- import com.google.common.base.Charsets;
 +import com.google.common.collect.Iterables;
 +
 +/**
 + *
 + */
 +public class ConditionalWriterIT extends AccumuloClusterHarness {
 +  private static final Logger log = 
LoggerFactory.getLogger(ConditionalWriterIT.class);
 +
 +  @Override
 +  protected int defaultTimeoutSeconds() {
 +    return 60;
 +  }
 +
 +  public static long abs(long l) {
 +    l = Math.abs(l); // abs(Long.MIN_VALUE) == Long.MIN_VALUE...
 +    if (l < 0)
 +      return 0;
 +    return l;
 +  }
 +
 +  @Before
 +  public void deleteUsers() throws Exception {
 +    Connector conn = getConnector();
 +    Set<String> users = conn.securityOperations().listLocalUsers();
 +    ClusterUser user = getUser(0);
 +    if (users.contains(user.getPrincipal())) {
 +      conn.securityOperations().dropLocalUser(user.getPrincipal());
 +    }
 +  }
 +
 +  @Test
 +  public void testBasic() throws Exception {
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig());
 +
 +    // mutation conditional on column tx:seq not existing
 +    ConditionalMutation cm0 = new ConditionalMutation("99006", new 
Condition("tx", "seq"));
 +    cm0.put("name", "last", "doe");
 +    cm0.put("name", "first", "john");
 +    cm0.put("tx", "seq", "1");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
 +
 +    // mutation conditional on column tx:seq being 1
 +    ConditionalMutation cm1 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setValue("1"));
 +    cm1.put("name", "last", "Doe");
 +    cm1.put("tx", "seq", "2");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
 +
 +    // test condition where value differs
 +    ConditionalMutation cm2 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setValue("1"));
 +    cm2.put("name", "last", "DOE");
 +    cm2.put("tx", "seq", "2");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
 +
 +    // test condition where column does not exists
 +    ConditionalMutation cm3 = new ConditionalMutation("99006", new 
Condition("txtypo", "seq").setValue("1"));
 +    cm3.put("name", "last", "deo");
 +    cm3.put("tx", "seq", "2");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
 +
 +    // test two conditions, where one should fail
 +    ConditionalMutation cm4 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setValue("2"), new Condition("name", 
"last").setValue("doe"));
 +    cm4.put("name", "last", "deo");
 +    cm4.put("tx", "seq", "3");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
 +
 +    // test two conditions, where one should fail
 +    ConditionalMutation cm5 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setValue("1"), new Condition("name", 
"last").setValue("Doe"));
 +    cm5.put("name", "last", "deo");
 +    cm5.put("tx", "seq", "3");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
 +
 +    // ensure rejected mutations did not write
 +    Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
 +    scanner.fetchColumn(new Text("name"), new Text("last"));
 +    scanner.setRange(new Range("99006"));
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("Doe", entry.getValue().toString());
 +
 +    // test w/ two conditions that are met
 +    ConditionalMutation cm6 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setValue("2"), new Condition("name", 
"last").setValue("Doe"));
 +    cm6.put("name", "last", "DOE");
 +    cm6.put("tx", "seq", "3");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
 +
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("DOE", entry.getValue().toString());
 +
 +    // test a conditional mutation that deletes
 +    ConditionalMutation cm7 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setValue("3"));
 +    cm7.putDelete("name", "last");
 +    cm7.putDelete("name", "first");
 +    cm7.putDelete("tx", "seq");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
 +
 +    Assert.assertFalse("Did not expect to find any results", 
scanner.iterator().hasNext());
 +
 +    // add the row back
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
 +
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("doe", entry.getValue().toString());
 +  }
 +
 +  @Test
 +  public void testFields() throws Exception {
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    String user = null;
 +    ClientConfiguration clientConf = cluster.getClientConfig();
 +    final boolean saslEnabled = 
clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false);
 +
 +    ClusterUser user1 = getUser(0);
 +    user = user1.getPrincipal();
 +    if (saslEnabled) {
 +      // The token is pointless for kerberos
 +      conn.securityOperations().createLocalUser(user, null);
 +    } else {
 +      conn.securityOperations().createLocalUser(user, new 
PasswordToken(user1.getPassword()));
 +    }
 +
 +    Authorizations auths = new Authorizations("A", "B");
 +
 +    conn.securityOperations().changeUserAuthorizations(user, auths);
 +    conn.securityOperations().grantSystemPermission(user, 
SystemPermission.CREATE_TABLE);
 +
 +    conn = conn.getInstance().getConnector(user, user1.getToken());
 +
 +    conn.tableOperations().create(tableName);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig().setAuthorizations(auths));
 +
 +    ColumnVisibility cva = new ColumnVisibility("A");
 +    ColumnVisibility cvb = new ColumnVisibility("B");
 +
 +    ConditionalMutation cm0 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cva));
 +    cm0.put("name", "last", cva, "doe");
 +    cm0.put("name", "first", cva, "john");
 +    cm0.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
 +
 +    Scanner scanner = conn.createScanner(tableName, auths);
 +    scanner.setRange(new Range("99006"));
 +    // TODO verify all columns
 +    scanner.fetchColumn(new Text("tx"), new Text("seq"));
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("1", entry.getValue().toString());
 +    long ts = entry.getKey().getTimestamp();
 +
 +    // test wrong colf
 +    ConditionalMutation cm1 = new ConditionalMutation("99006", new 
Condition("txA", "seq").setVisibility(cva).setValue("1"));
 +    cm1.put("name", "last", cva, "Doe");
 +    cm1.put("name", "first", cva, "John");
 +    cm1.put("tx", "seq", cva, "2");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
 +
 +    // test wrong colq
 +    ConditionalMutation cm2 = new ConditionalMutation("99006", new 
Condition("tx", "seqA").setVisibility(cva).setValue("1"));
 +    cm2.put("name", "last", cva, "Doe");
 +    cm2.put("name", "first", cva, "John");
 +    cm2.put("tx", "seq", cva, "2");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus());
 +
 +    // test wrong colv
 +    ConditionalMutation cm3 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb).setValue("1"));
 +    cm3.put("name", "last", cva, "Doe");
 +    cm3.put("name", "first", cva, "John");
 +    cm3.put("tx", "seq", cva, "2");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus());
 +
 +    // test wrong timestamp
 +    ConditionalMutation cm4 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1"));
 +    cm4.put("name", "last", cva, "Doe");
 +    cm4.put("name", "first", cva, "John");
 +    cm4.put("tx", "seq", cva, "2");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus());
 +
 +    // test wrong timestamp
 +    ConditionalMutation cm5 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1"));
 +    cm5.put("name", "last", cva, "Doe");
 +    cm5.put("name", "first", cva, "John");
 +    cm5.put("tx", "seq", cva, "2");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus());
 +
 +    // ensure no updates were made
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("1", entry.getValue().toString());
 +
 +    // set all columns correctly
 +    ConditionalMutation cm6 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1"));
 +    cm6.put("name", "last", cva, "Doe");
 +    cm6.put("name", "first", cva, "John");
 +    cm6.put("tx", "seq", cva, "2");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
 +
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("2", entry.getValue().toString());
 +
 +  }
 +
 +  @Test
 +  public void testBadColVis() throws Exception {
 +    // test when a user sets a col vis in a condition that can never be seen
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName);
 +
 +    Authorizations auths = new Authorizations("A", "B");
 +
 +    conn.securityOperations().changeUserAuthorizations(getAdminPrincipal(), 
auths);
 +
 +    Authorizations filteredAuths = new Authorizations("A");
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig().setAuthorizations(filteredAuths));
 +
 +    ColumnVisibility cva = new ColumnVisibility("A");
 +    ColumnVisibility cvb = new ColumnVisibility("B");
 +    ColumnVisibility cvc = new ColumnVisibility("C");
 +
 +    // User has authorization, but didn't include it in the writer
 +    ConditionalMutation cm0 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb));
 +    cm0.put("name", "last", cva, "doe");
 +    cm0.put("name", "first", cva, "john");
 +    cm0.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm0).getStatus());
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb).setValue("1"));
 +    cm1.put("name", "last", cva, "doe");
 +    cm1.put("name", "first", cva, "john");
 +    cm1.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm1).getStatus());
 +
 +    // User does not have the authorization
 +    ConditionalMutation cm2 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvc));
 +    cm2.put("name", "last", cva, "doe");
 +    cm2.put("name", "first", cva, "john");
 +    cm2.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm2).getStatus());
 +
 +    ConditionalMutation cm3 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvc).setValue("1"));
 +    cm3.put("name", "last", cva, "doe");
 +    cm3.put("name", "first", cva, "john");
 +    cm3.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm3).getStatus());
 +
 +    // if any visibility is bad, good visibilities don't override
 +    ConditionalMutation cm4 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb), new Condition("tx", 
"seq").setVisibility(cva));
 +
 +    cm4.put("name", "last", cva, "doe");
 +    cm4.put("name", "first", cva, "john");
 +    cm4.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm4).getStatus());
 +
 +    ConditionalMutation cm5 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb).setValue("1"), new Condition("tx", 
"seq")
 +        .setVisibility(cva).setValue("1"));
 +    cm5.put("name", "last", cva, "doe");
 +    cm5.put("name", "first", cva, "john");
 +    cm5.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm5).getStatus());
 +
 +    ConditionalMutation cm6 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb).setValue("1"),
 +        new Condition("tx", "seq").setVisibility(cva));
 +    cm6.put("name", "last", cva, "doe");
 +    cm6.put("name", "first", cva, "john");
 +    cm6.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm6).getStatus());
 +
 +    ConditionalMutation cm7 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb), new Condition("tx", 
"seq").setVisibility(cva)
 +        .setValue("1"));
 +    cm7.put("name", "last", cva, "doe");
 +    cm7.put("name", "first", cva, "john");
 +    cm7.put("tx", "seq", cva, "1");
 +    Assert.assertEquals(Status.INVISIBLE_VISIBILITY, 
cw.write(cm7).getStatus());
 +
 +    cw.close();
 +
 +    // test passing auths that exceed users configured auths
 +
 +    Authorizations exceedingAuths = new Authorizations("A", "B", "D");
 +    ConditionalWriter cw2 = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig().setAuthorizations(exceedingAuths));
 +
 +    ConditionalMutation cm8 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvb), new Condition("tx", 
"seq").setVisibility(cva)
 +        .setValue("1"));
 +    cm8.put("name", "last", cva, "doe");
 +    cm8.put("name", "first", cva, "john");
 +    cm8.put("tx", "seq", cva, "1");
 +
 +    try {
 +      Status status = cw2.write(cm8).getStatus();
 +      Assert.fail("Writing mutation with Authorizations the user doesn't have 
should fail. Got status: " + status);
 +    } catch (AccumuloSecurityException ase) {
 +      // expected, check specific failure?
 +    } finally {
 +      cw2.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testConstraints() throws Exception {
 +    // ensure constraint violations are properly reported
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName);
 +    conn.tableOperations().addConstraint(tableName, 
AlphaNumKeyConstraint.class.getName());
 +    conn.tableOperations().clone(tableName, tableName + "_clone", true, new 
HashMap<String,String>(), new HashSet<String>());
 +
 +    Scanner scanner = conn.createScanner(tableName + "_clone", new 
Authorizations());
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName + "_clone", 
new ConditionalWriterConfig());
 +
 +    ConditionalMutation cm0 = new ConditionalMutation("99006+", new 
Condition("tx", "seq"));
 +    cm0.put("tx", "seq", "1");
 +
 +    Assert.assertEquals(Status.VIOLATED, cw.write(cm0).getStatus());
 +    Assert.assertFalse("Should find no results in the table is mutation 
result was violated", scanner.iterator().hasNext());
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("99006", new 
Condition("tx", "seq"));
 +    cm1.put("tx", "seq", "1");
 +
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
 +    Assert.assertTrue("Accepted result should be returned when reading 
table", scanner.iterator().hasNext());
 +
 +    cw.close();
 +  }
 +
 +  @Test
 +  public void testIterators() throws Exception {
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName, new 
NewTableConfiguration().withoutDefaultIterators());
 +
 +    BatchWriter bw = conn.createBatchWriter(tableName, new 
BatchWriterConfig());
 +
 +    Mutation m = new Mutation("ACCUMULO-1000");
 +    m.put("count", "comments", "1");
 +    bw.addMutation(m);
 +    bw.addMutation(m);
 +    bw.addMutation(m);
 +
 +    m = new Mutation("ACCUMULO-1001");
 +    m.put("count2", "comments", "1");
 +    bw.addMutation(m);
 +    bw.addMutation(m);
 +
 +    m = new Mutation("ACCUMULO-1002");
 +    m.put("count2", "comments", "1");
 +    bw.addMutation(m);
 +    bw.addMutation(m);
 +
 +    bw.close();
 +
 +    IteratorSetting iterConfig = new IteratorSetting(10, 
SummingCombiner.class);
 +    SummingCombiner.setEncodingType(iterConfig, Type.STRING);
 +    SummingCombiner.setColumns(iterConfig, Collections.singletonList(new 
IteratorSetting.Column("count")));
 +
 +    IteratorSetting iterConfig2 = new IteratorSetting(10, 
SummingCombiner.class);
 +    SummingCombiner.setEncodingType(iterConfig2, Type.STRING);
 +    SummingCombiner.setColumns(iterConfig2, Collections.singletonList(new 
IteratorSetting.Column("count2", "comments")));
 +
 +    IteratorSetting iterConfig3 = new IteratorSetting(5, 
VersioningIterator.class);
 +    VersioningIterator.setMaxVersions(iterConfig3, 1);
 +
 +    Scanner scanner = conn.createScanner(tableName, new Authorizations());
 +    scanner.addScanIterator(iterConfig);
 +    scanner.setRange(new Range("ACCUMULO-1000"));
 +    scanner.fetchColumn(new Text("count"), new Text("comments"));
 +
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("3", entry.getValue().toString());
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig());
 +
 +    ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setValue("3"));
 +    cm0.put("count", "comments", "1");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus());
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("3", entry.getValue().toString());
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(iterConfig).setValue("3"));
 +    cm1.put("count", "comments", "1");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("4", entry.getValue().toString());
 +
 +    ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setValue("4"));
 +    cm2.put("count", "comments", "1");
 +    Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus());
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("4", entry.getValue().toString());
 +
 +    // run test with multiple iterators passed in same batch and condition 
with two iterators
 +
 +    ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(iterConfig).setValue("4"));
 +    cm3.put("count", "comments", "1");
 +
 +    ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new 
Condition("count2", "comments").setIterators(iterConfig2).setValue("2"));
 +    cm4.put("count2", "comments", "1");
 +
 +    ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new 
Condition("count2", "comments").setIterators(iterConfig2, 
iterConfig3).setValue("2"));
 +    cm5.put("count2", "comments", "1");
 +
 +    Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, 
cm5).iterator());
 +    Map<String,Status> actual = new HashMap<String,Status>();
 +
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      String k = new String(result.getMutation().getRow());
 +      Assert.assertFalse("Did not expect to see multiple resultus for the 
row: " + k, actual.containsKey(k));
 +      actual.put(k, result.getStatus());
 +    }
 +
 +    Map<String,Status> expected = new HashMap<String,Status>();
 +    expected.put("ACCUMULO-1000", Status.ACCEPTED);
 +    expected.put("ACCUMULO-1001", Status.ACCEPTED);
 +    expected.put("ACCUMULO-1002", Status.REJECTED);
 +
 +    Assert.assertEquals(expected, actual);
 +
 +    cw.close();
 +  }
 +
 +  public static class AddingIterator extends WrappingIterator {
 +    long amount = 0;
 +
 +    @Override
 +    public Value getTopValue() {
 +      Value val = super.getTopValue();
 +      long l = Long.parseLong(val.toString());
 +      String newVal = (l + amount) + "";
-       return new Value(newVal.getBytes(Charsets.UTF_8));
++      return new Value(newVal.getBytes(UTF_8));
 +    }
 +
 +    @Override
 +    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
 +      this.setSource(source);
 +      amount = Long.parseLong(options.get("amount"));
 +    }
 +  }
 +
 +  public static class MultiplyingIterator extends WrappingIterator {
 +    long amount = 0;
 +
 +    @Override
 +    public Value getTopValue() {
 +      Value val = super.getTopValue();
 +      long l = Long.parseLong(val.toString());
 +      String newVal = l * amount + "";
-       return new Value(newVal.getBytes(Charsets.UTF_8));
++      return new Value(newVal.getBytes(UTF_8));
 +    }
 +
 +    @Override
 +    public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
 +      this.setSource(source);
 +      amount = Long.parseLong(options.get("amount"));
 +    }
 +  }
 +
 +  @Test
 +  public void testTableAndConditionIterators() throws Exception {
 +
 +    // test w/ table that has iterators configured
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    IteratorSetting aiConfig1 = new IteratorSetting(30, "AI1", 
AddingIterator.class);
 +    aiConfig1.addOption("amount", "2");
 +    IteratorSetting aiConfig2 = new IteratorSetting(35, "MI1", 
MultiplyingIterator.class);
 +    aiConfig2.addOption("amount", "3");
 +    IteratorSetting aiConfig3 = new IteratorSetting(40, "AI2", 
AddingIterator.class);
 +    aiConfig3.addOption("amount", "5");
 +
 +    conn.tableOperations().create(tableName);
 +
 +    BatchWriter bw = conn.createBatchWriter(tableName, new 
BatchWriterConfig());
 +
 +    Mutation m = new Mutation("ACCUMULO-1000");
 +    m.put("count", "comments", "6");
 +    bw.addMutation(m);
 +
 +    m = new Mutation("ACCUMULO-1001");
 +    m.put("count", "comments", "7");
 +    bw.addMutation(m);
 +
 +    m = new Mutation("ACCUMULO-1002");
 +    m.put("count", "comments", "8");
 +    bw.addMutation(m);
 +
 +    bw.close();
 +
 +    conn.tableOperations().attachIterator(tableName, aiConfig1, 
EnumSet.of(IteratorScope.scan));
 +    conn.tableOperations().offline(tableName, true);
 +    conn.tableOperations().online(tableName, true);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig());
 +
 +    ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setValue("8"));
 +    cm6.put("count", "comments", "7");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus());
 +
 +    Scanner scanner = conn.createScanner(tableName, new Authorizations());
 +    scanner.setRange(new Range("ACCUMULO-1000"));
 +    scanner.fetchColumn(new Text("count"), new Text("comments"));
 +
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("9", entry.getValue().toString());
 +
 +    ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(aiConfig2).setValue("27"));
 +    cm7.put("count", "comments", "8");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus());
 +
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("10", entry.getValue().toString());
 +
 +    ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(aiConfig2, 
aiConfig3).setValue("35"));
 +    cm8.put("count", "comments", "9");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus());
 +
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("11", entry.getValue().toString());
 +
 +    ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new 
Condition("count", "comments").setIterators(aiConfig2).setValue("33"));
 +    cm3.put("count", "comments", "3");
 +
 +    ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new 
Condition("count", "comments").setIterators(aiConfig3).setValue("14"));
 +    cm4.put("count", "comments", "3");
 +
 +    ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new 
Condition("count", "comments").setIterators(aiConfig3).setValue("10"));
 +    cm5.put("count", "comments", "3");
 +
 +    Iterator<Result> results = cw.write(Arrays.asList(cm3, cm4, 
cm5).iterator());
 +    Map<String,Status> actual = new HashMap<String,Status>();
 +
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      String k = new String(result.getMutation().getRow());
 +      Assert.assertFalse("Did not expect to see multiple resultus for the 
row: " + k, actual.containsKey(k));
 +      actual.put(k, result.getStatus());
 +    }
 +
 +    cw.close();
 +
 +    Map<String,Status> expected = new HashMap<String,Status>();
 +    expected.put("ACCUMULO-1000", Status.ACCEPTED);
 +    expected.put("ACCUMULO-1001", Status.ACCEPTED);
 +    expected.put("ACCUMULO-1002", Status.REJECTED);
 +
 +    Assert.assertEquals(expected, actual);
 +
 +    cw.close();
 +  }
 +
 +  @Test
 +  public void testBatch() throws Exception {
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName);
 +
 +    conn.securityOperations().changeUserAuthorizations(getAdminPrincipal(), 
new Authorizations("A", "B"));
 +
 +    ColumnVisibility cvab = new ColumnVisibility("A|B");
 +
 +    ArrayList<ConditionalMutation> mutations = new 
ArrayList<ConditionalMutation>();
 +
 +    ConditionalMutation cm0 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvab));
 +    cm0.put("name", "last", cvab, "doe");
 +    cm0.put("name", "first", cvab, "john");
 +    cm0.put("tx", "seq", cvab, "1");
 +    mutations.add(cm0);
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("59056", new 
Condition("tx", "seq").setVisibility(cvab));
 +    cm1.put("name", "last", cvab, "doe");
 +    cm1.put("name", "first", cvab, "jane");
 +    cm1.put("tx", "seq", cvab, "1");
 +    mutations.add(cm1);
 +
 +    ConditionalMutation cm2 = new ConditionalMutation("19059", new 
Condition("tx", "seq").setVisibility(cvab));
 +    cm2.put("name", "last", cvab, "doe");
 +    cm2.put("name", "first", cvab, "jack");
 +    cm2.put("tx", "seq", cvab, "1");
 +    mutations.add(cm2);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
 +    Iterator<Result> results = cw.write(mutations.iterator());
 +    int count = 0;
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
 +      count++;
 +    }
 +
 +    Assert.assertEquals(3, count);
 +
 +    Scanner scanner = conn.createScanner(tableName, new Authorizations("A"));
 +    scanner.fetchColumn(new Text("tx"), new Text("seq"));
 +
 +    for (String row : new String[] {"99006", "59056", "19059"}) {
 +      scanner.setRange(new Range(row));
 +      Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +      Assert.assertEquals("1", entry.getValue().toString());
 +    }
 +
 +    TreeSet<Text> splits = new TreeSet<Text>();
 +    splits.add(new Text("7"));
 +    splits.add(new Text("3"));
 +    conn.tableOperations().addSplits(tableName, splits);
 +
 +    mutations.clear();
 +
 +    ConditionalMutation cm3 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvab).setValue("1"));
 +    cm3.put("name", "last", cvab, "Doe");
 +    cm3.put("tx", "seq", cvab, "2");
 +    mutations.add(cm3);
 +
 +    ConditionalMutation cm4 = new ConditionalMutation("59056", new 
Condition("tx", "seq").setVisibility(cvab));
 +    cm4.put("name", "last", cvab, "Doe");
 +    cm4.put("tx", "seq", cvab, "1");
 +    mutations.add(cm4);
 +
 +    ConditionalMutation cm5 = new ConditionalMutation("19059", new 
Condition("tx", "seq").setVisibility(cvab).setValue("2"));
 +    cm5.put("name", "last", cvab, "Doe");
 +    cm5.put("tx", "seq", cvab, "3");
 +    mutations.add(cm5);
 +
 +    results = cw.write(mutations.iterator());
 +    int accepted = 0;
 +    int rejected = 0;
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      if (new String(result.getMutation().getRow()).equals("99006")) {
 +        Assert.assertEquals(Status.ACCEPTED, result.getStatus());
 +        accepted++;
 +      } else {
 +        Assert.assertEquals(Status.REJECTED, result.getStatus());
 +        rejected++;
 +      }
 +    }
 +
 +    Assert.assertEquals("Expected only one accepted conditional mutation", 1, 
accepted);
 +    Assert.assertEquals("Expected two rejected conditional mutations", 2, 
rejected);
 +
 +    for (String row : new String[] {"59056", "19059"}) {
 +      scanner.setRange(new Range(row));
 +      Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +      Assert.assertEquals("1", entry.getValue().toString());
 +    }
 +
 +    scanner.setRange(new Range("99006"));
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("2", entry.getValue().toString());
 +
 +    scanner.clearColumns();
 +    scanner.fetchColumn(new Text("name"), new Text("last"));
 +    entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("Doe", entry.getValue().toString());
 +
 +    cw.close();
 +  }
 +
 +  @Test
 +  public void testBigBatch() throws Exception {
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName);
 +    conn.tableOperations().addSplits(tableName, nss("2", "4", "6"));
 +
 +    sleepUninterruptibly(2, TimeUnit.SECONDS);
 +
 +    int num = 100;
 +
 +    ArrayList<byte[]> rows = new ArrayList<byte[]>(num);
 +    ArrayList<ConditionalMutation> cml = new 
ArrayList<ConditionalMutation>(num);
 +
 +    Random r = new Random();
 +    byte[] e = new byte[0];
 +
 +    for (int i = 0; i < num; i++) {
 +      rows.add(FastFormat.toZeroPaddedString(abs(r.nextLong()), 16, 16, e));
 +    }
 +
 +    for (int i = 0; i < num; i++) {
 +      ConditionalMutation cm = new ConditionalMutation(rows.get(i), new 
Condition("meta", "seq"));
 +
 +      cm.put("meta", "seq", "1");
 +      cm.put("meta", "tx", UUID.randomUUID().toString());
 +
 +      cml.add(cm);
 +    }
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig());
 +
 +    Iterator<Result> results = cw.write(cml.iterator());
 +
 +    int count = 0;
 +
 +    // TODO check got each row back
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
 +      count++;
 +    }
 +
 +    Assert.assertEquals("Did not receive the expected number of results", 
num, count);
 +
 +    ArrayList<ConditionalMutation> cml2 = new 
ArrayList<ConditionalMutation>(num);
 +
 +    for (int i = 0; i < num; i++) {
 +      ConditionalMutation cm = new ConditionalMutation(rows.get(i), new 
Condition("meta", "seq").setValue("1"));
 +
 +      cm.put("meta", "seq", "2");
 +      cm.put("meta", "tx", UUID.randomUUID().toString());
 +
 +      cml2.add(cm);
 +    }
 +
 +    count = 0;
 +
 +    results = cw.write(cml2.iterator());
 +
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
 +      count++;
 +    }
 +
 +    Assert.assertEquals("Did not receive the expected number of results", 
num, count);
 +
 +    cw.close();
 +  }
 +
 +  @Test
 +  public void testBatchErrors() throws Exception {
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName);
 +    conn.tableOperations().addConstraint(tableName, 
AlphaNumKeyConstraint.class.getName());
 +    conn.tableOperations().clone(tableName, tableName + "_clone", true, new 
HashMap<String,String>(), new HashSet<String>());
 +
 +    conn.securityOperations().changeUserAuthorizations(getAdminPrincipal(), 
new Authorizations("A", "B"));
 +
 +    ColumnVisibility cvaob = new ColumnVisibility("A|B");
 +    ColumnVisibility cvaab = new ColumnVisibility("A&B");
 +
 +    switch ((new Random()).nextInt(3)) {
 +      case 1:
 +        conn.tableOperations().addSplits(tableName, nss("6"));
 +        break;
 +      case 2:
 +        conn.tableOperations().addSplits(tableName, nss("2", "95"));
 +        break;
 +    }
 +
 +    ArrayList<ConditionalMutation> mutations = new 
ArrayList<ConditionalMutation>();
 +
 +    ConditionalMutation cm0 = new ConditionalMutation("99006", new 
Condition("tx", "seq").setVisibility(cvaob));
 +    cm0.put("name+", "last", cvaob, "doe");
 +    cm0.put("name", "first", cvaob, "john");
 +    cm0.put("tx", "seq", cvaob, "1");
 +    mutations.add(cm0);
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("59056", new 
Condition("tx", "seq").setVisibility(cvaab));
 +    cm1.put("name", "last", cvaab, "doe");
 +    cm1.put("name", "first", cvaab, "jane");
 +    cm1.put("tx", "seq", cvaab, "1");
 +    mutations.add(cm1);
 +
 +    ConditionalMutation cm2 = new ConditionalMutation("19059", new 
Condition("tx", "seq").setVisibility(cvaob));
 +    cm2.put("name", "last", cvaob, "doe");
 +    cm2.put("name", "first", cvaob, "jack");
 +    cm2.put("tx", "seq", cvaob, "1");
 +    mutations.add(cm2);
 +
 +    ConditionalMutation cm3 = new ConditionalMutation("90909", new 
Condition("tx", "seq").setVisibility(cvaob).setValue("1"));
 +    cm3.put("name", "last", cvaob, "doe");
 +    cm3.put("name", "first", cvaob, "john");
 +    cm3.put("tx", "seq", cvaob, "2");
 +    mutations.add(cm3);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig().setAuthorizations(new Authorizations("A")));
 +    Iterator<Result> results = cw.write(mutations.iterator());
 +    HashSet<String> rows = new HashSet<String>();
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      String row = new String(result.getMutation().getRow());
 +      if (row.equals("19059")) {
 +        Assert.assertEquals(Status.ACCEPTED, result.getStatus());
 +      } else if (row.equals("59056")) {
 +        Assert.assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus());
 +      } else if (row.equals("99006")) {
 +        Assert.assertEquals(Status.VIOLATED, result.getStatus());
 +      } else if (row.equals("90909")) {
 +        Assert.assertEquals(Status.REJECTED, result.getStatus());
 +      }
 +      rows.add(row);
 +    }
 +
 +    Assert.assertEquals(4, rows.size());
 +
 +    Scanner scanner = conn.createScanner(tableName, new Authorizations("A"));
 +    scanner.fetchColumn(new Text("tx"), new Text("seq"));
 +
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(scanner);
 +    Assert.assertEquals("1", entry.getValue().toString());
 +
 +    cw.close();
 +  }
 +
 +  @Test
 +  public void testSameRow() throws Exception {
 +    // test multiple mutations for same row in same batch
 +
 +    Connector conn = getConnector();
 +    String tableName = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(tableName);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig());
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("r1", new 
Condition("tx", "seq"));
 +    cm1.put("tx", "seq", "1");
 +    cm1.put("data", "x", "a");
 +
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus());
 +
 +    ConditionalMutation cm2 = new ConditionalMutation("r1", new 
Condition("tx", "seq").setValue("1"));
 +    cm2.put("tx", "seq", "2");
 +    cm2.put("data", "x", "b");
 +
 +    ConditionalMutation cm3 = new ConditionalMutation("r1", new 
Condition("tx", "seq").setValue("1"));
 +    cm3.put("tx", "seq", "2");
 +    cm3.put("data", "x", "c");
 +
 +    ConditionalMutation cm4 = new ConditionalMutation("r1", new 
Condition("tx", "seq").setValue("1"));
 +    cm4.put("tx", "seq", "2");
 +    cm4.put("data", "x", "d");
 +
 +    Iterator<Result> results = cw.write(Arrays.asList(cm2, cm3, 
cm4).iterator());
 +
 +    int accepted = 0;
 +    int rejected = 0;
 +    int total = 0;
 +
 +    while (results.hasNext()) {
 +      Status status = results.next().getStatus();
 +      if (status == Status.ACCEPTED)
 +        accepted++;
 +      if (status == Status.REJECTED)
 +        rejected++;
 +      total++;
 +    }
 +
 +    Assert.assertEquals("Expected one accepted result", 1, accepted);
 +    Assert.assertEquals("Expected two rejected results", 2, rejected);
 +    Assert.assertEquals("Expected three total results", 3, total);
 +
 +    cw.close();
 +  }
 +
 +  private static class Stats {
 +
 +    ByteSequence row = null;
 +    int seq;
 +    long sum;
 +    int data[] = new int[10];
 +
 +    public Stats(Iterator<Entry<Key,Value>> iterator) {
 +      while (iterator.hasNext()) {
 +        Entry<Key,Value> entry = iterator.next();
 +
 +        if (row == null)
 +          row = entry.getKey().getRowData();
 +
 +        String cf = entry.getKey().getColumnFamilyData().toString();
 +        String cq = entry.getKey().getColumnQualifierData().toString();
 +
 +        if (cf.equals("data")) {
 +          data[Integer.parseInt(cq)] = 
Integer.parseInt(entry.getValue().toString());
 +        } else if (cf.equals("meta")) {
 +          if (cq.equals("sum")) {
 +            sum = Long.parseLong(entry.getValue().toString());
 +          } else if (cq.equals("seq")) {
 +            seq = Integer.parseInt(entry.getValue().toString());
 +          }
 +        }
 +      }
 +
 +      long sum2 = 0;
 +
 +      for (int datum : data) {
 +        sum2 += datum;
 +      }
 +
 +      Assert.assertEquals(sum2, sum);
 +    }
 +
 +    public Stats(ByteSequence row) {
 +      this.row = row;
 +      for (int i = 0; i < data.length; i++) {
 +        this.data[i] = 0;
 +      }
 +      this.seq = -1;
 +      this.sum = 0;
 +    }
 +
 +    void set(int index, int value) {
 +      sum -= data[index];
 +      sum += value;
 +      data[index] = value;
 +    }
 +
 +    ConditionalMutation toMutation() {
 +      Condition cond = new Condition("meta", "seq");
 +      if (seq >= 0)
 +        cond.setValue(seq + "");
 +
 +      ConditionalMutation cm = new ConditionalMutation(row, cond);
 +
 +      cm.put("meta", "seq", (seq + 1) + "");
 +      cm.put("meta", "sum", (sum) + "");
 +
 +      for (int i = 0; i < data.length; i++) {
 +        cm.put("data", i + "", data[i] + "");
 +      }
 +
 +      return cm;
 +    }
 +
 +    @Override
 +    public String toString() {
 +      return row + " " + seq + " " + sum;
 +    }
 +  }
 +
 +  private static class MutatorTask implements Runnable {
 +    String table;
 +    ArrayList<ByteSequence> rows;
 +    ConditionalWriter cw;
 +    Connector conn;
 +    AtomicBoolean failed;
 +
 +    public MutatorTask(String table, Connector conn, ArrayList<ByteSequence> 
rows, ConditionalWriter cw, AtomicBoolean failed) {
 +      this.table = table;
 +      this.rows = rows;
 +      this.conn = conn;
 +      this.cw = cw;
 +      this.failed = failed;
 +    }
 +
 +    @Override
 +    public void run() {
 +      try {
 +        Random rand = new Random();
 +
 +        Scanner scanner = new IsolatedScanner(conn.createScanner(table, 
Authorizations.EMPTY));
 +
 +        for (int i = 0; i < 20; i++) {
 +          int numRows = rand.nextInt(10) + 1;
 +
 +          ArrayList<ByteSequence> changes = new 
ArrayList<ByteSequence>(numRows);
 +          ArrayList<ConditionalMutation> mutations = new 
ArrayList<ConditionalMutation>();
 +
 +          for (int j = 0; j < numRows; j++)
 +            changes.add(rows.get(rand.nextInt(rows.size())));
 +
 +          for (ByteSequence row : changes) {
 +            scanner.setRange(new Range(row.toString()));
 +            Stats stats = new Stats(scanner.iterator());
 +            stats.set(rand.nextInt(10), rand.nextInt(Integer.MAX_VALUE));
 +            mutations.add(stats.toMutation());
 +          }
 +
 +          ArrayList<ByteSequence> changed = new 
ArrayList<ByteSequence>(numRows);
 +          Iterator<Result> results = cw.write(mutations.iterator());
 +          while (results.hasNext()) {
 +            Result result = results.next();
 +            changed.add(new ArrayByteSequence(result.getMutation().getRow()));
 +          }
 +
 +          Collections.sort(changes);
 +          Collections.sort(changed);
 +
 +          Assert.assertEquals(changes, changed);
 +
 +        }
 +
 +      } catch (Exception e) {
 +        log.error("{}", e.getMessage(), e);
 +        failed.set(true);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testThreads() throws Exception {
 +    // test multiple threads using a single conditional writer
 +
 +    String table = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +
 +    conn.tableOperations().create(table);
 +
 +    Random rand = new Random();
 +
 +    switch (rand.nextInt(3)) {
 +      case 1:
 +        conn.tableOperations().addSplits(table, nss("4"));
 +        break;
 +      case 2:
 +        conn.tableOperations().addSplits(table, nss("3", "5"));
 +        break;
 +    }
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(table, new 
ConditionalWriterConfig());
 +
 +    ArrayList<ByteSequence> rows = new ArrayList<ByteSequence>();
 +
 +    for (int i = 0; i < 1000; i++) {
 +      rows.add(new 
ArrayByteSequence(FastFormat.toZeroPaddedString(abs(rand.nextLong()), 16, 16, 
new byte[0])));
 +    }
 +
 +    ArrayList<ConditionalMutation> mutations = new 
ArrayList<ConditionalMutation>();
 +
 +    for (ByteSequence row : rows)
 +      mutations.add(new Stats(row).toMutation());
 +
 +    ArrayList<ByteSequence> rows2 = new ArrayList<ByteSequence>();
 +    Iterator<Result> results = cw.write(mutations.iterator());
 +    while (results.hasNext()) {
 +      Result result = results.next();
 +      Assert.assertEquals(Status.ACCEPTED, result.getStatus());
 +      rows2.add(new ArrayByteSequence(result.getMutation().getRow()));
 +    }
 +
 +    Collections.sort(rows);
 +    Collections.sort(rows2);
 +
 +    Assert.assertEquals(rows, rows2);
 +
 +    AtomicBoolean failed = new AtomicBoolean(false);
 +
 +    ExecutorService tp = Executors.newFixedThreadPool(5);
 +    for (int i = 0; i < 5; i++) {
 +      tp.submit(new MutatorTask(table, conn, rows, cw, failed));
 +    }
 +
 +    tp.shutdown();
 +
 +    while (!tp.isTerminated()) {
 +      tp.awaitTermination(1, TimeUnit.MINUTES);
 +    }
 +
 +    Assert.assertFalse("A MutatorTask failed with an exception", 
failed.get());
 +
 +    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
 +
 +    RowIterator rowIter = new RowIterator(scanner);
 +
 +    while (rowIter.hasNext()) {
 +      Iterator<Entry<Key,Value>> row = rowIter.next();
 +      new Stats(row);
 +    }
 +  }
 +
 +  private SortedSet<Text> nss(String... splits) {
 +    TreeSet<Text> ret = new TreeSet<Text>();
 +    for (String split : splits)
 +      ret.add(new Text(split));
 +
 +    return ret;
 +  }
 +
 +  @Test
 +  public void testSecurity() throws Exception {
 +    // test against table user does not have read and/or write permissions for
 +    Connector conn = getConnector();
 +    String user = null;
 +    ClientConfiguration clientConf = cluster.getClientConfig();
 +    final boolean saslEnabled = 
clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false);
 +
 +    // Create a new user
 +    ClusterUser user1 = getUser(0);
 +    user = user1.getPrincipal();
 +    if (saslEnabled) {
 +      conn.securityOperations().createLocalUser(user, null);
 +    } else {
 +      conn.securityOperations().createLocalUser(user, new 
PasswordToken(user1.getPassword()));
 +    }
 +
 +    String[] tables = getUniqueNames(3);
 +    String table1 = tables[0], table2 = tables[1], table3 = tables[2];
 +
 +    // Create three tables
 +    conn.tableOperations().create(table1);
 +    conn.tableOperations().create(table2);
 +    conn.tableOperations().create(table3);
 +
 +    // Grant R on table1, W on table2, R/W on table3
 +    conn.securityOperations().grantTablePermission(user, table1, 
TablePermission.READ);
 +    conn.securityOperations().grantTablePermission(user, table2, 
TablePermission.WRITE);
 +    conn.securityOperations().grantTablePermission(user, table3, 
TablePermission.READ);
 +    conn.securityOperations().grantTablePermission(user, table3, 
TablePermission.WRITE);
 +
 +    // Login as the user
 +    Connector conn2 = conn.getInstance().getConnector(user, user1.getToken());
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("r1", new 
Condition("tx", "seq"));
 +    cm1.put("tx", "seq", "1");
 +    cm1.put("data", "x", "a");
 +
 +    ConditionalWriter cw1 = conn2.createConditionalWriter(table1, new 
ConditionalWriterConfig());
 +    ConditionalWriter cw2 = conn2.createConditionalWriter(table2, new 
ConditionalWriterConfig());
 +    ConditionalWriter cw3 = conn2.createConditionalWriter(table3, new 
ConditionalWriterConfig());
 +
 +    // Should be able to conditional-update a table we have R/W on
 +    Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus());
 +
 +    // Conditional-update to a table we only have read on should fail
 +    try {
 +      Status status = cw1.write(cm1).getStatus();
 +      Assert.fail("Expected exception writing conditional mutation to table 
the user doesn't have write access to, Got status: " + status);
 +    } catch (AccumuloSecurityException ase) {
 +
 +    }
 +
 +    // Conditional-update to a table we only have writer on should fail
 +    try {
 +      Status status = cw2.write(cm1).getStatus();
 +      Assert.fail("Expected exception writing conditional mutation to table 
the user doesn't have read access to. Got status: " + status);
 +    } catch (AccumuloSecurityException ase) {
 +
 +    }
 +  }
 +
 +  @Test
 +  public void testTimeout() throws Exception {
 +    Connector conn = getConnector();
 +
 +    String table = getUniqueNames(1)[0];
 +
 +    conn.tableOperations().create(table);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(table, new 
ConditionalWriterConfig().setTimeout(3, TimeUnit.SECONDS));
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("r1", new 
Condition("tx", "seq"));
 +    cm1.put("tx", "seq", "1");
 +    cm1.put("data", "x", "a");
 +
 +    Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED);
 +
 +    IteratorSetting is = new IteratorSetting(5, SlowIterator.class);
 +    SlowIterator.setSeekSleepTime(is, 5000);
 +
 +    ConditionalMutation cm2 = new ConditionalMutation("r1", new 
Condition("tx", "seq").setValue("1").setIterators(is));
 +    cm2.put("tx", "seq", "2");
 +    cm2.put("data", "x", "b");
 +
 +    Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN);
 +
 +    Scanner scanner = conn.createScanner(table, Authorizations.EMPTY);
 +
 +    for (Entry<Key,Value> entry : scanner) {
 +      String cf = entry.getKey().getColumnFamilyData().toString();
 +      String cq = entry.getKey().getColumnQualifierData().toString();
 +      String val = entry.getValue().toString();
 +
 +      if (cf.equals("tx") && cq.equals("seq"))
 +        Assert.assertEquals("Unexpected value in tx:seq", "1", val);
 +      else if (cf.equals("data") && cq.equals("x"))
 +        Assert.assertEquals("Unexpected value in data:x", "a", val);
 +      else
 +        Assert.fail("Saw unexpected column family and qualifier: " + entry);
 +    }
 +
 +    ConditionalMutation cm3 = new ConditionalMutation("r1", new 
Condition("tx", "seq").setValue("1"));
 +    cm3.put("tx", "seq", "2");
 +    cm3.put("data", "x", "b");
 +
 +    Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED);
 +
 +    cw.close();
 +  }
 +
 +  @Test
 +  public void testDeleteTable() throws Exception {
 +    String table = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +
 +    try {
 +      conn.createConditionalWriter(table, new ConditionalWriterConfig());
 +      Assert.fail("Creating conditional writer for table that doesn't exist 
should fail");
 +    } catch (TableNotFoundException e) {}
 +
 +    conn.tableOperations().create(table);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(table, new 
ConditionalWriterConfig());
 +
 +    conn.tableOperations().delete(table);
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("r1", new 
Condition("tx", "seq"));
 +    cm1.put("tx", "seq", "1");
 +    cm1.put("data", "x", "a");
 +
 +    Result result = cw.write(cm1);
 +
 +    try {
 +      Status status = result.getStatus();
 +      Assert.fail("Expected exception writing conditional mutation to deleted 
table. Got status: " + status);
 +    } catch (AccumuloException ae) {
 +      Assert.assertEquals(TableDeletedException.class, 
ae.getCause().getClass());
 +    }
 +  }
 +
 +  @Test
 +  public void testOffline() throws Exception {
 +    String table = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +
 +    conn.tableOperations().create(table);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(table, new 
ConditionalWriterConfig());
 +
 +    conn.tableOperations().offline(table, true);
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("r1", new 
Condition("tx", "seq"));
 +    cm1.put("tx", "seq", "1");
 +    cm1.put("data", "x", "a");
 +
 +    Result result = cw.write(cm1);
 +
 +    try {
 +      Status status = result.getStatus();
 +      Assert.fail("Expected exception writing conditional mutation to offline 
table. Got status: " + status);
 +    } catch (AccumuloException ae) {
 +      Assert.assertEquals(TableOfflineException.class, 
ae.getCause().getClass());
 +    }
 +
 +    cw.close();
 +
 +    try {
 +      conn.createConditionalWriter(table, new ConditionalWriterConfig());
 +      Assert.fail("Expected exception creating conditional writer to offline 
table");
 +    } catch (TableOfflineException e) {}
 +  }
 +
 +  @Test
 +  public void testError() throws Exception {
 +    String table = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +
 +    conn.tableOperations().create(table);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(table, new 
ConditionalWriterConfig());
 +
 +    IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("r1", new 
Condition("tx", "seq").setIterators(iterSetting));
 +    cm1.put("tx", "seq", "1");
 +    cm1.put("data", "x", "a");
 +
 +    Result result = cw.write(cm1);
 +
 +    try {
 +      Status status = result.getStatus();
 +      Assert.fail("Expected exception using iterator which throws an error, 
Got status: " + status);
 +    } catch (AccumuloException ae) {
 +
 +    }
 +
 +    cw.close();
 +  }
 +
 +  @Test(expected = IllegalArgumentException.class)
 +  public void testNoConditions() throws AccumuloException, 
AccumuloSecurityException, TableExistsException, TableNotFoundException {
 +    String table = getUniqueNames(1)[0];
 +    Connector conn = getConnector();
 +
 +    conn.tableOperations().create(table);
 +
 +    ConditionalWriter cw = conn.createConditionalWriter(table, new 
ConditionalWriterConfig());
 +
 +    ConditionalMutation cm1 = new ConditionalMutation("r1");
 +    cm1.put("tx", "seq", "1");
 +    cm1.put("data", "x", "a");
 +
 +    cw.write(cm1);
 +  }
 +
 +  @Test
 +  public void testTrace() throws Exception {
 +    // Need to add a getClientConfig() to AccumuloCluster
 +    Assume.assumeTrue(getClusterType() == ClusterType.MINI);
 +    Process tracer = null;
 +    Connector conn = getConnector();
 +    AccumuloCluster cluster = getCluster();
 +    MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster;
 +    if (!conn.tableOperations().exists("trace")) {
 +      tracer = mac.exec(TraceServer.class);
 +      while (!conn.tableOperations().exists("trace")) {
 +        sleepUninterruptibly(1, TimeUnit.SECONDS);
 +      }
 +    }
 +
 +    String tableName = getUniqueNames(1)[0];
 +    conn.tableOperations().create(tableName);
 +
 +    DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig());
 +    sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    Span root = Trace.on("traceTest");
 +    ConditionalWriter cw = conn.createConditionalWriter(tableName, new 
ConditionalWriterConfig());
 +
 +    // mutation conditional on column tx:seq not exiting
 +    ConditionalMutation cm0 = new ConditionalMutation("99006", new 
Condition("tx", "seq"));
 +    cm0.put("name", "last", "doe");
 +    cm0.put("name", "first", "john");
 +    cm0.put("tx", "seq", "1");
 +    Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus());
 +    root.stop();
 +
 +    final Scanner scanner = conn.createScanner("trace", Authorizations.EMPTY);
 +    scanner.setRange(new Range(new Text(Long.toHexString(root.traceId()))));
 +    loop: while (true) {
 +      final StringBuffer finalBuffer = new StringBuffer();
 +      int traceCount = TraceDump.printTrace(scanner, new Printer() {
 +        @Override
 +        public void print(final String line) {
 +          try {
 +            finalBuffer.append(line).append("\n");
 +          } catch (Exception ex) {
 +            throw new RuntimeException(ex);
 +          }
 +        }
 +      });
 +      String traceOutput = finalBuffer.toString();
 +      log.info("Trace output:" + traceOutput);
 +      if (traceCount > 0) {
 +        int lastPos = 0;
 +        for (String part : "traceTest, 
startScan,startConditionalUpdate,conditionalUpdate,Check conditions,apply 
conditional mutations".split(",")) {
 +          log.info("Looking in trace output for '" + part + "'");
 +          int pos = traceOutput.indexOf(part);
 +          if (-1 == pos) {
 +            log.info("Trace output doesn't contain '" + part + "'");
 +            Thread.sleep(1000);
 +            break loop;
 +          }
 +          assertTrue("Did not find '" + part + "' in output", pos > 0);
 +          assertTrue("'" + part + "' occurred earlier than the previous 
element unexpectedly", pos > lastPos);
 +          lastPos = pos;
 +        }
 +        break;
 +      } else {
 +        log.info("Ignoring trace output as traceCount not greater than zero: 
" + traceCount);
 +        Thread.sleep(1000);
 +      }
 +    }
 +    if (tracer != null) {
 +      tracer.destroy();
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java
index 4f83668,0000000..ae2e4cc
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellConfigIT.java
@@@ -1,103 -1,0 +1,103 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
++import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.io.File;
- import java.nio.charset.StandardCharsets;
 +
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 +import org.apache.accumulo.core.client.security.tokens.KerberosToken;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import 
org.apache.accumulo.harness.conf.StandaloneAccumuloClusterConfiguration;
 +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 +import org.apache.accumulo.test.ShellServerIT.TestShell;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +public class ShellConfigIT extends AccumuloClusterHarness {
 +  @Override
 +  public int defaultTimeoutSeconds() {
 +    return 30;
 +  }
 +
 +  private String origPropValue;
 +
 +  @Before
 +  public void checkProperty() throws Exception {
 +    Connector conn = getConnector();
 +    // TABLE_VOLUME_CHOOSER is a valid property that can be updated in ZK, 
whereas the crypto properties are not.
 +    // This lets us run this test more generically rather than forcibly 
needing to update some property in accumulo-site.xml
 +    origPropValue = 
conn.instanceOperations().getSystemConfiguration().get(Property.TABLE_VOLUME_CHOOSER.getKey());
 +    
conn.instanceOperations().setProperty(Property.TABLE_VOLUME_CHOOSER.getKey(), 
FairVolumeChooser.class.getName());
 +  }
 +
 +  @After
 +  public void resetProperty() throws Exception {
 +    if (null != origPropValue) {
 +      Connector conn = getConnector();
 +      
conn.instanceOperations().setProperty(Property.TABLE_VOLUME_CHOOSER.getKey(), 
origPropValue);
 +    }
 +  }
 +
 +  @Test
 +  public void experimentalPropTest() throws Exception {
 +    // ensure experimental props do not show up in config output unless set
 +
 +    AuthenticationToken token = getAdminToken();
 +    File clientConfFile = null;
 +    switch (getClusterType()) {
 +      case MINI:
 +        MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster();
 +        clientConfFile = mac.getConfig().getClientConfFile();
 +        break;
 +      case STANDALONE:
 +        StandaloneAccumuloClusterConfiguration standaloneConf = 
(StandaloneAccumuloClusterConfiguration) getClusterConfiguration();
 +        clientConfFile = standaloneConf.getClientConfFile();
 +        break;
 +      default:
 +        Assert.fail("Unknown cluster type");
 +    }
 +
 +    Assert.assertNotNull(clientConfFile);
 +
 +    TestShell ts = null;
 +    if (token instanceof PasswordToken) {
-       String passwd = new String(((PasswordToken) token).getPassword(), 
StandardCharsets.UTF_8);
++      String passwd = new String(((PasswordToken) token).getPassword(), 
UTF_8);
 +      ts = new TestShell(getAdminPrincipal(), passwd, 
getCluster().getInstanceName(), getCluster().getZooKeepers(), clientConfFile);
 +    } else if (token instanceof KerberosToken) {
 +      ts = new TestShell(getAdminPrincipal(), null, 
getCluster().getInstanceName(), getCluster().getZooKeepers(), clientConfFile);
 +    } else {
 +      Assert.fail("Unknown token type");
 +    }
 +
 +    assertTrue(Property.TABLE_VOLUME_CHOOSER.isExperimental());
 +    assertTrue(Property.CRYPTO_CIPHER_ALGORITHM_NAME.isExperimental());
 +
 +    String configOutput = ts.exec("config");
 +
 +    assertTrue(configOutput.contains(Property.TABLE_VOLUME_CHOOSER.getKey()));
 +    
assertFalse(configOutput.contains(Property.CRYPTO_CIPHER_ALGORITHM_NAME.getKey()));
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
index 9cc3dc0,0000000..4829c46
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java
@@@ -1,120 -1,0 +1,120 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.test;
 +
- import static com.google.common.base.Charsets.UTF_8;
++import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
 +
 +import java.util.ArrayList;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.ClientConfiguration;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.impl.ClientContext;
 +import org.apache.accumulo.core.client.impl.Credentials;
 +import org.apache.accumulo.core.client.impl.ThriftTransportKey;
 +import org.apache.accumulo.core.client.impl.ThriftTransportPool;
 +import org.apache.accumulo.core.conf.DefaultConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.util.ServerServices;
 +import org.apache.accumulo.core.util.ServerServices.Service;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.zookeeper.ZooCache;
 +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.thrift.transport.TTransport;
 +import org.apache.thrift.transport.TTransportException;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Test that {@link ThriftTransportPool} actually adheres to the 
cachedConnection argument
 + */
 +public class TransportCachingIT extends AccumuloClusterHarness {
 +  private static final Logger log = 
LoggerFactory.getLogger(TransportCachingIT.class);
 +
 +  @Test
 +  public void testCachedTransport() {
 +    Connector conn = getConnector();
 +    Instance instance = conn.getInstance();
 +    ClientConfiguration clientConf = cluster.getClientConfig();
 +    ClientContext context = new ClientContext(instance, new 
Credentials(getAdminPrincipal(), getAdminToken()), clientConf);
 +    long rpcTimeout = 
DefaultConfiguration.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue());
 +
 +    // create list of servers
 +    ArrayList<ThriftTransportKey> servers = new 
ArrayList<ThriftTransportKey>();
 +
 +    // add tservers
 +    ZooCache zc = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), 
instance.getZooKeepersSessionTimeOut());
 +    for (String tserver : zc.getChildren(ZooUtil.getRoot(instance) + 
Constants.ZTSERVERS)) {
 +      String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + 
tserver;
 +      byte[] data = ZooUtil.getLockData(zc, path);
 +      if (data != null) {
 +        String strData = new String(data, UTF_8);
 +        if (!strData.equals("master"))
 +          servers.add(new ThriftTransportKey(new 
ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, context));
 +      }
 +    }
 +
 +    ThriftTransportPool pool = ThriftTransportPool.getInstance();
 +    TTransport first = null;
 +    while (null == first) {
 +      try {
 +        // Get a transport (cached or not)
 +        first = pool.getAnyTransport(servers, true).getSecond();
 +      } catch (TTransportException e) {
 +        log.warn("Failed to obtain transport to " + servers);
 +      }
 +    }
 +
 +    assertNotNull(first);
 +    // Return it to unreserve it
 +    pool.returnTransport(first);
 +
 +    TTransport second = null;
 +    while (null == second) {
 +      try {
 +        // Get a cached transport (should be the first)
 +        second = pool.getAnyTransport(servers, true).getSecond();
 +      } catch (TTransportException e) {
 +        log.warn("Failed obtain 2nd transport to " + servers);
 +      }
 +    }
 +
 +    // We should get the same transport
 +    assertTrue("Expected the first and second to be the same instance", first 
== second);
 +    // Return the 2nd
 +    pool.returnTransport(second);
 +
 +    TTransport third = null;
 +    while (null == third) {
 +      try {
 +        // Get a non-cached transport
 +        third = pool.getAnyTransport(servers, false).getSecond();
 +      } catch (TTransportException e) {
 +        log.warn("Failed obtain 2nd transport to " + servers);
 +      }
 +    }
 +
 +    assertFalse("Expected second and third transport to be different 
instances", second == third);
 +    pool.returnTransport(third);
 +  }
 +}

Reply via email to